You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2019/03/15 17:20:34 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-748 - Nanofi: add custom C processor example

This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new b5e47b8  MINIFICPP-748 - Nanofi: add custom C processor example
b5e47b8 is described below

commit b5e47b80fa69d6e36a0e552ec7b6ba86d69b6649
Author: Arpad Boda <ab...@hortonworks.com>
AuthorDate: Tue Mar 5 19:11:04 2019 +0100

    MINIFICPP-748 - Nanofi: add custom C processor example
    
    This closes #499.
    
    Signed-off-by: Marc Parisi <ph...@apache.org>
---
 nanofi/examples/CMakeLists.txt |   4 ++
 nanofi/examples/hash_file.c    | 121 +++++++++++++++++++++++++++++++++++++++++
 nanofi/src/api/nanofi.cpp      |  34 +++++++++---
 3 files changed, 151 insertions(+), 8 deletions(-)

diff --git a/nanofi/examples/CMakeLists.txt b/nanofi/examples/CMakeLists.txt
index 0b9ccde..a3db4d2 100644
--- a/nanofi/examples/CMakeLists.txt
+++ b/nanofi/examples/CMakeLists.txt
@@ -67,6 +67,10 @@ target_link_libraries(generate_flow nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAG
 
 target_link_libraries(terminate_handler nanofi ${CMAKE_THREAD_LIBS_INIT} )
 
+add_executable(hash_file hash_file.c)
+
+target_link_libraries(hash_file nanofi ${CMAKE_THREAD_LIBS_INIT} )
+
 add_executable(transmit_flow transmit_flow.c)
 
 target_link_libraries(transmit_flow nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
diff --git a/nanofi/examples/hash_file.c b/nanofi/examples/hash_file.c
new file mode 100644
index 0000000..a3e3311
--- /dev/null
+++ b/nanofi/examples/hash_file.c
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+    *
+    *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#ifdef OPENSSL_SUPPORT
+
+#include "api/nanofi.h"
+#include <openssl/md5.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/stat.h>
+
+void custom_processor_logic(processor_session * ps, processor_context * ctx) {
+  flow_file_record * ffr = get(ps, ctx);
+  if(ffr == NULL) {
+    return;
+  }
+  uint8_t * buffer = (uint8_t*)malloc(ffr->size* sizeof(uint8_t));
+
+  get_content(ffr, buffer, ffr->size);
+
+
+  MD5_CTX context;
+  MD5_Init(&context);
+  MD5_Update(&context, buffer, ffr->size);
+
+  free(buffer);
+
+  unsigned char digest[MD5_DIGEST_LENGTH];
+  MD5_Final(digest, &context);
+
+  char md5string[33];
+  for(int i = 0; i < 16; ++i) {
+    sprintf(&md5string[i*2], "%02x", (unsigned int)digest[i]);
+  }
+
+  char prop_value[50];
+
+  if(get_property(ctx, "checksum_attr_name", prop_value, 50) != 0) {
+    return; // Attr name not found
+  }
+
+  add_attribute(ffr, prop_value, (void*)md5string, strlen(md5string));
+
+  transfer_to_relationship(ffr, ps, SUCCESS_RELATIONSHIP);
+
+  free_flowfile(ffr);
+}
+
+int main(int argc, char **argv) {
+
+  if (argc < 2) {
+    printf("Error: must run ./hash_file <file>\n");
+    exit(1);
+  }
+
+  char *file = argv[1];
+
+  if(access( file, F_OK ) == -1) {
+    printf("Error: %s doesn't exist!\n", file);
+    exit(1);
+  }
+
+  struct stat stats;
+  stat(file, &stats);
+
+  // Check for file existence
+  if (S_ISDIR(stats.st_mode)){
+    printf("Error: %s is a directory!\n", file);
+    exit(1);
+  }
+
+  add_custom_processor("md5proc", custom_processor_logic);
+
+  standalone_processor *standalone_proc = create_processor("md5proc");
+
+  const char * checksum_attr_name = "md5attr";
+
+  set_standalone_property(standalone_proc, "checksum_attr_name", checksum_attr_name);
+
+
+  flow_file_record * ffr = create_flowfile(file, strlen(file));
+
+  flow_file_record * new_ff = invoke_ff(standalone_proc, ffr);
+
+  free_flowfile(ffr);
+
+  attribute attr;
+  attr.key = checksum_attr_name;
+  attr.value_size = 0;
+  get_attribute(new_ff, &attr);
+
+  char md5value[50];
+
+  snprintf(md5value, attr.value_size, "%s", (const char*)attr.value+1);
+
+  printf("Checksum of %s is %s\n", file, md5value);
+
+  free_flowfile(new_ff);
+
+  free_standalone_processor(standalone_proc);
+
+  return 0;
+}
+
+#endif //OPENSSL_SUPPORT
\ No newline at end of file
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index 756544e..a7a37df 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -371,9 +371,18 @@ int8_t remove_attribute(flow_file_record *ff, const char *key) {
 int get_content(const flow_file_record* ff, uint8_t* target, int size) {
   NULL_CHECK(0, ff, target);
   auto content_repo = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
-  std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo);
-  auto stream = (*content_repo)->read(claim);
-  return stream->read(target, size);
+  if(ff->crp && (*content_repo)) {
+    std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation,
+                                                                                           *content_repo);
+    auto stream = (*content_repo)->read(claim);
+    return stream->read(target, size);
+  } else {
+    file_buffer fb = file_to_buffer(ff->contentLocation);
+    size_t copy_size = size < fb.file_len ? size : fb.file_len;
+    memcpy(target, fb.buffer, copy_size*sizeof(uint8_t));
+    free(fb.buffer);
+    return copy_size;
+  }
 }
 
 /**
@@ -414,6 +423,7 @@ int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) {
       file_buffer fb = file_to_buffer(ff->contentLocation);
       stream = std::make_shared<minifi::io::DataStream>();
       stream->writeData(fb.buffer, fb.file_len);
+      free(fb.buffer);
     }
   } else {
     //The flowfile has no content - create an empty stream to create empty content
@@ -606,13 +616,21 @@ flow_file_record *invoke_ff(standalone_processor* proc, const flow_file_record *
   plan->reset();
 
   if (input_ff) {
-    auto ff_data = std::make_shared<flowfile_input_params>();
     auto content_repo = static_cast<std::shared_ptr<minifi::core::ContentRepository> *>(input_ff->crp);
-    std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(input_ff->contentLocation,
-                                                                                           *content_repo);
-    ff_data->content_stream = (*content_repo)->read(claim);
-    ff_data->attributes = *static_cast<std::map<std::string, std::string> *>(input_ff->attributes);
+    auto ff_data = std::make_shared<flowfile_input_params>();
 
+    if(input_ff->crp && (*content_repo)) {
+      std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(input_ff->contentLocation,
+                                                                                             *content_repo);
+      ff_data->content_stream = (*content_repo)->read(claim);
+    } else {
+      ff_data->content_stream = std::make_shared<minifi::io::DataStream>();
+      file_buffer fb = file_to_buffer(input_ff->contentLocation);
+      ff_data->content_stream->writeData(fb.buffer, fb.file_len);
+      free(fb.buffer);
+    }
+
+    ff_data->attributes = *static_cast<std::map<std::string, std::string> *>(input_ff->attributes);
     plan->runNextProcessor(nullptr, ff_data);
   }
   while (plan->runNextProcessor()) {