You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2019/01/02 16:49:58 UTC

[GitHub] asfgit closed pull request #462: MINIFICPP-695 - NanoFi Examples appear to be broken

asfgit closed pull request #462: MINIFICPP-695 - NanoFi Examples appear to be broken
URL: https://github.com/apache/nifi-minifi-cpp/pull/462
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/nanofi/examples/transmit_flow.c b/nanofi/examples/transmit_flow.c
index e132c8b1..66bedde2 100644
--- a/nanofi/examples/transmit_flow.c
+++ b/nanofi/examples/transmit_flow.c
@@ -89,5 +89,16 @@ int main(int argc, char **argv) {
   //initialize_instance(instance);
   transfer_file_or_directory(instance,file);
 
+  //Create flowfile without content (just a set of attributes)
+  flow_file_record * record = create_ff_object_nc();
+
+  const char * custom_value = "transmitted value";
+
+  add_attribute(record, "transmitted attribute", (void*)custom_value, strlen(custom_value));
+
+  transmit_flowfile(record, instance);
+
+  free_flowfile(record);
+
   free_instance(instance);
 }
diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h
index 06504922..68f0bd7a 100644
--- a/nanofi/include/api/nanofi.h
+++ b/nanofi/include/api/nanofi.h
@@ -267,15 +267,39 @@ flow_file_record *invoke_chunk(standalone_processor *proc, uint8_t *buf, uint64_
 int transfer(processor_session* session, flow *flow, const char *rel);
 
 /**
- * Creates a flow file object.
- * Will obtain the size of file
+ * Creates a flow file record based on a file
+ * @param file source file
+ * @param len length of the file name
+ * @return a flow file record or nullptr in case no flowfile was generated
  */
 flow_file_record* create_flowfile(const char *file, const size_t len);
 
+/**
+ * Creates a flow file record based on a file
+ * @param file source file
+ * @param len length of the file name
+ * @param size size of the file
+ * @return a flow file record or nullptr in case no flowfile was generated
+ */
 flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size);
 
+/**
+ * Creates a flow file record based on a file, without attributes
+ * @attention attributes cannot be added later!
+ * @param file source file
+ * @param len length of the file name
+ * @param size size of the file
+ * @return a flow file record or nullptr in case no flowfile was generated
+ */
 flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size);
 
+/**
+ * Creates a flow file record without content. Only attributes can be added.
+ * @attention content cannot be added later!
+ * @return a flow file record or nullptr in case no flowfile was generated
+ */
+flow_file_record* create_ff_object_nc();
+
 /**
  * Get incoming flow file. To be used in processor logic callbacks.
  * @param session current processor session
@@ -299,7 +323,7 @@ void free_flowfile(flow_file_record* ff);
  * @size size size of the data pointed by "value"
  * @return 0 in case of success, -1 otherwise (already existed)
  **/
-uint8_t add_attribute(flow_file_record*, const char *key, void *value, size_t size);
+int8_t add_attribute(flow_file_record*, const char *key, void *value, size_t size);
 
 /**
  * Updates an attribute (adds if it hasn't existed before)
@@ -316,7 +340,7 @@ void update_attribute(flow_file_record* ff, const char *key, void *value, size_t
  * @param caller_attribute attribute structure to provide name and get value, size
  * @return 0 in case of success, -1 otherwise (no such attribute)
  **/
-uint8_t get_attribute(const flow_file_record *ff, attribute *caller_attribute);
+int8_t get_attribute(const flow_file_record *ff, attribute *caller_attribute);
 
 /**
  * Get the quantity of attributes
@@ -347,7 +371,7 @@ int get_content(const flow_file_record* ff, uint8_t* target, int size);
  * @param name name of the attribute
  * @return 0 on success, -1 otherwise (doesn't exist)
  **/
-uint8_t remove_attribute(flow_file_record*, char *key);
+int8_t remove_attribute(flow_file_record*, const char * key);
 
 /****
  * ##################################################################
diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h
index d85c856d..9a785448 100644
--- a/nanofi/include/core/cstructs.h
+++ b/nanofi/include/core/cstructs.h
@@ -128,4 +128,9 @@ typedef enum FS {
 
 typedef void (processor_logic)(processor_session*, processor_context *);
 
+typedef struct file_buffer {
+  uint8_t * buffer;
+  uint64_t file_len;
+} file_buffer;
+
 #endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */
diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h
index 982f6d4b..13707103 100644
--- a/nanofi/include/cxx/Instance.h
+++ b/nanofi/include/cxx/Instance.h
@@ -122,7 +122,7 @@ class Instance {
     return content_repo_;
   }
 
-  void transfer(const std::shared_ptr<FlowFileRecord> &ff) {
+  void transfer(const std::shared_ptr<FlowFileRecord> &ff, const std::shared_ptr<minifi::io::DataStream> &stream = nullptr) {
     std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr;
     auto processContext = std::make_shared<core::ProcessContext>(proc_node_, controller_service_provider, no_op_repo_, no_op_repo_, content_repo_);
     auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(processContext);
@@ -132,7 +132,9 @@ class Instance {
     auto session = std::make_shared<core::ReflexiveSession>(processContext);
 
     session->add(ff);
-
+    if(stream) {
+      session->importFrom(*stream.get(), ff);
+    }
     rpg_->onTrigger(processContext, session);
   }
 
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index aa957aca..e71f5555 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -35,6 +35,17 @@
 #include "io/DataStream.h"
 #include "core/cxxstructs.h"
 
+#define NULL_CHECK(ret_val, ...)                        \
+  do {                                                  \
+    const void *_p[] = { __VA_ARGS__ };                 \
+    int _i;                                             \
+    for (_i = 0; _i < sizeof(_p)/sizeof(*_p); _i++) {   \
+      if (_p[_i] == NULL) {                             \
+        return ret_val;                                 \
+      }                                                 \
+    }                                                   \
+  } while(0)
+
 using string_map = std::map<std::string, std::string>;
 
 class API_INITIALIZER {
@@ -42,6 +53,33 @@ class API_INITIALIZER {
   static int initialized;
 };
 
+//Just an internal utility func., not to be published via API!
+file_buffer file_to_buffer(const char *path) {
+  file_buffer fb;
+  fb.buffer = nullptr;
+  fb.file_len = 0;
+
+  NULL_CHECK(fb, path);
+  FILE *fileptr;
+  uint8_t *buffer;
+  uint64_t filelen;
+
+  fileptr = fopen(path, "rb");
+  NULL_CHECK(fb, fileptr);
+
+  fseek(fileptr, 0, SEEK_END);
+  filelen = ftell(fileptr);
+  rewind(fileptr);
+
+  buffer = (uint8_t *)malloc((filelen+1)*sizeof(uint8_t)); // Enough memory for file + \0
+  fread(buffer, filelen, 1, fileptr);
+  fclose(fileptr);
+
+  fb.buffer = buffer;
+  fb.file_len = filelen;
+  return fb;
+}
+
 int API_INITIALIZER::initialized = initialize_api();
 
 static nifi_instance* standalone_instance = nullptr;
@@ -88,6 +126,9 @@ nifi_instance *create_instance(const char *url, nifi_port *port) {
    * Since minifi::Instance is currently being used, then we need to use new in that case.
    */
   instance->instance_ptr = new minifi::Instance(url, port->port_id);
+
+  NULL_CHECK(nullptr, instance->instance_ptr);
+
   // may have to translate port ID here in the future
   // need reinterpret cast until we move to C for this module.
   instance->port.port_id = reinterpret_cast<char*>(malloc(strlen(port->port_id) + 1));
@@ -96,6 +137,7 @@ nifi_instance *create_instance(const char *url, nifi_port *port) {
 }
 
 standalone_processor *create_processor(const char *name) {
+  NULL_CHECK(nullptr, name);
   auto ptr = ExecutionPlan::createProcessor(name, name);
   if (!ptr) {
     return nullptr;
@@ -114,9 +156,7 @@ standalone_processor *create_processor(const char *name) {
 }
 
 void free_standalone_processor(standalone_processor* proc) {
-  if (proc == nullptr) {
-    return;
-  }
+  NULL_CHECK(, proc);
   ExecutionPlan::removeProcWithPlan(proc->getUUIDStr());
 
   if (ExecutionPlan::getProcWithPlanQty() == 0) {
@@ -154,9 +194,7 @@ void enable_async_c2(nifi_instance *instance, C2_Server *server, c2_stop_callbac
  * @return -1 when instance or key are null
  */
 int set_instance_property(nifi_instance *instance, const char *key, const char *value) {
-  if (nullptr == instance || nullptr == instance->instance_ptr || nullptr == key) {
-    return -1;
-  }
+  NULL_CHECK(-1, instance, key);
   auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
   minifi_instance_ref->getConfiguration()->set(key, value);
   return 0;
@@ -167,11 +205,10 @@ int set_instance_property(nifi_instance *instance, const char *key, const char *
  * @param instance nifi instance.
  */
 void free_instance(nifi_instance* instance) {
-  if (instance != nullptr) {
-    delete ((minifi::Instance*) instance->instance_ptr);
-    free(instance->port.port_id);
-    free(instance);
-  }
+  NULL_CHECK(, instance);
+  delete ((minifi::Instance*) instance->instance_ptr);
+  free(instance->port.port_id);
+  free(instance);
 }
 
 /**
@@ -179,15 +216,10 @@ void free_instance(nifi_instance* instance) {
  * @param file file to place into the flow file.
  */
 flow_file_record* create_flowfile(const char *file, const size_t len) {
-  flow_file_record *new_ff = (flow_file_record*) malloc(sizeof(flow_file_record));
-  new_ff->attributes = new string_map();
-  new_ff->contentLocation = (char*) malloc(sizeof(char) * (len + 1));
-  snprintf(new_ff->contentLocation, len + 1, "%s", file);
+  NULL_CHECK(nullptr, file);
   std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
-  // set the size of the flow file.
-  new_ff->size = in.tellg();
-  new_ff->keepContent = 0;
-  return new_ff;
+  uint64_t file_size = in.tellg();
+  return create_ff_object(file, len, file_size);
 }
 
 /**
@@ -195,38 +227,48 @@ flow_file_record* create_flowfile(const char *file, const size_t len) {
  * @param file file to place into the flow file.
  */
 flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size) {
-  if (nullptr == file) {
-    return nullptr;
-  }
+  NULL_CHECK(nullptr, file);
   flow_file_record *new_ff = create_ff_object_na(file, len, size);
   new_ff->attributes = new string_map();
-  new_ff->ffp = 0;
   return new_ff;
 }
 
 flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size) {
-  flow_file_record *new_ff = (flow_file_record*) malloc(sizeof(flow_file_record));
+  flow_file_record *new_ff = (flow_file_record *) malloc(sizeof(flow_file_record));
   new_ff->attributes = nullptr;
-  new_ff->contentLocation = (char*) malloc(sizeof(char) * (len + 1));
-  snprintf(new_ff->contentLocation, len + 1, "%s", file);
-  // set the size of the flow file.
-  new_ff->size = size;
+  if (file != nullptr) {
+    new_ff->contentLocation = (char *) malloc(sizeof(char) * (len + 1));
+    snprintf(new_ff->contentLocation, len + 1, "%s", file);
+    // set the size of the flow file.
+    new_ff->size = size;
+  } else {
+    new_ff->contentLocation = nullptr;
+    new_ff->size = 0;
+  }
   new_ff->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>);
+  new_ff->ffp = nullptr;
   new_ff->keepContent = 0;
   return new_ff;
 }
+
+flow_file_record* create_ff_object_nc() {
+  flow_file_record* new_ff = create_ff_object_na(nullptr, 0, 0);
+  new_ff->attributes = new string_map();
+  return new_ff;
+}
 /**
  * Reclaims memory associated with a flow file object
  * @param ff flow file record.
  */
 void free_flowfile(flow_file_record *ff) {
-  if (ff == nullptr) {
-    return;
-  }
-  auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
-  if (content_repo_ptr->get() && (ff->keepContent == 0)) {
-    std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr);
-    (*content_repo_ptr)->remove(claim);
+  NULL_CHECK(, ff);
+  if (ff->crp){
+    auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
+    if((*content_repo_ptr) && (ff->keepContent == 0)) {
+      auto claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr);
+      (*content_repo_ptr)->remove(claim);
+    }
+    delete content_repo_ptr;
   }
   if (ff->ffp == nullptr) {
     auto map = static_cast<string_map*>(ff->attributes);
@@ -237,7 +279,6 @@ void free_flowfile(flow_file_record *ff) {
   }
   free(ff->contentLocation);
   free(ff);
-  delete content_repo_ptr;
 }
 
 /**
@@ -248,7 +289,9 @@ void free_flowfile(flow_file_record *ff) {
  * @param size size of value
  * @return 0 or -1 based on whether the attributed existed previously (-1) or not (0)
  */
-uint8_t add_attribute(flow_file_record *ff, const char *key, void *value, size_t size) {
+int8_t add_attribute(flow_file_record *ff, const char *key, void *value, size_t size) {
+  NULL_CHECK(-1, ff, key, value);
+  NULL_CHECK(-1, ff->attributes);
   auto attribute_map = static_cast<string_map*>(ff->attributes);
   const auto& ret = attribute_map->insert(std::pair<std::string, std::string>(key, std::string(static_cast<char*>(value), size)));
   return ret.second ? 0 : -1;
@@ -262,6 +305,8 @@ uint8_t add_attribute(flow_file_record *ff, const char *key, void *value, size_t
  * @param size size of value
  */
 void update_attribute(flow_file_record *ff, const char *key, void *value, size_t size) {
+  NULL_CHECK(, ff, key);
+  NULL_CHECK(, ff->attributes);
   auto attribute_map = static_cast<string_map*>(ff->attributes);
   (*attribute_map)[key] = std::string(static_cast<char*>(value), size);
 }
@@ -273,14 +318,10 @@ void update_attribute(flow_file_record *ff, const char *key, void *value, size_t
  * @param caller_attribute caller supplied object in which we will copy the data ptr
  * @return 0 if successful, -1 if the key does not exist
  */
-uint8_t get_attribute(const flow_file_record * ff, attribute * caller_attribute) {
-  if (ff == nullptr) {
-    return -1;
-  }
+int8_t get_attribute(const flow_file_record * ff, attribute * caller_attribute) {
+  NULL_CHECK(-1, ff, caller_attribute);
+  NULL_CHECK(-1, ff->attributes, caller_attribute->key);
   auto attribute_map = static_cast<string_map*>(ff->attributes);
-  if (!attribute_map) {
-    return -1;
-  }
   auto find = attribute_map->find(caller_attribute->key);
   if (find != attribute_map->end()) {
     caller_attribute->value = static_cast<void*>(const_cast<char*>(find->second.data()));
@@ -291,21 +332,16 @@ uint8_t get_attribute(const flow_file_record * ff, attribute * caller_attribute)
 }
 
 int get_attribute_quantity(const flow_file_record *ff) {
-  if (ff == nullptr) {
-    return 0;
-  }
+  NULL_CHECK(0, ff);
+  NULL_CHECK(0, ff->attributes);
   auto attribute_map = static_cast<string_map*>(ff->attributes);
   return attribute_map ? attribute_map->size() : 0;
 }
 
 int get_all_attributes(const flow_file_record* ff, attribute_set *target) {
-  if (ff == nullptr) {
-    return 0;
-  }
+  NULL_CHECK(0, ff, target);
+  NULL_CHECK(0, ff->attributes, target->attributes);
   auto attribute_map = static_cast<string_map*>(ff->attributes);
-  if (!attribute_map || attribute_map->empty()) {
-    return 0;
-  }
   int i = 0;
   for (const auto& kv : *attribute_map) {
     if (i >= target->size) {
@@ -325,15 +361,15 @@ int get_all_attributes(const flow_file_record* ff, attribute_set *target) {
  * @param key key to remove
  * @return 0 if removed, -1 otherwise
  */
-uint8_t remove_attribute(flow_file_record *ff, const char *key) {
+int8_t remove_attribute(flow_file_record *ff, const char *key) {
+  NULL_CHECK(-1, ff, key);
+  NULL_CHECK(-1, ff->attributes);
   auto attribute_map = static_cast<string_map*>(ff->attributes);
   return attribute_map->erase(key) - 1;  // erase by key returns the number of elements removed (0 or 1)
 }
 
 int get_content(const flow_file_record* ff, uint8_t* target, int size) {
-  if (ff == nullptr || target == nullptr || size == 0) {
-    return 0;
-  }
+  NULL_CHECK(0, ff, target);
   auto content_repo = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
   std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo);
   auto stream = (*content_repo)->read(claim);
@@ -346,48 +382,67 @@ int get_content(const flow_file_record* ff, uint8_t* target, int size) {
  * @param instance nifi instance structure
  */
 int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) {
+  static string_map empty_attribute_map;
+
+  NULL_CHECK(-1, ff, instance);
   auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
   // in the unlikely event the user forgot to initialize the instance, we shall do it for them.
   if (UNLIKELY(minifi_instance_ref->isRPGConfigured() == false)) {
     minifi_instance_ref->setRemotePort(instance->port.port_id);
   }
 
-  auto attribute_map = static_cast<string_map*>(ff->attributes);
+  string_map& attribute_map = empty_attribute_map;
+  if(ff->attributes) {
+    attribute_map = *(static_cast<string_map *>(ff->attributes));
+  }
 
   auto no_op = minifi_instance_ref->getNoOpRepository();
 
   auto content_repo = minifi_instance_ref->getContentRepository();
 
-  std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo);
-  claim->increaseFlowFileRecordOwnedCount();
-  claim->increaseFlowFileRecordOwnedCount();
+  std::shared_ptr<minifi::ResourceClaim> claim = nullptr;
+  std::shared_ptr<minifi::io::DataStream> stream = nullptr; //Used when content is not in content repo
+
+  if(ff->contentLocation) {
+    auto ff_content_repo_ptr = (static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp));
+    if (ff->crp && (*ff_content_repo_ptr)) {
+      content_repo = *ff_content_repo_ptr;
+      claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo);
+      claim->increaseFlowFileRecordOwnedCount();
+      claim->increaseFlowFileRecordOwnedCount();
+    } else {
+      file_buffer fb = file_to_buffer(ff->contentLocation);
+      stream = std::make_shared<minifi::io::DataStream>();
+      stream->writeData(fb.buffer, fb.file_len);
+    }
+  } else {
+    //The flowfile has no content - create an empty stream to create empty content
+    stream = std::make_shared<minifi::io::DataStream>();
+  }
 
-  auto ffr = std::make_shared<minifi::FlowFileRecord>(no_op, content_repo, *attribute_map, claim);
+  auto ffr = std::make_shared<minifi::FlowFileRecord>(no_op, content_repo, attribute_map, claim);
   ffr->addAttribute("nanofi.version", API_VERSION);
   ffr->setSize(ff->size);
 
   std::string port_uuid = instance->port.port_id;
 
-  minifi_instance_ref->transfer(ffr);
+  minifi_instance_ref->transfer(ffr, stream);
 
   return 0;
 }
 
 flow * create_new_flow(nifi_instance * instance) {
-  if (nullptr == instance || nullptr == instance->instance_ptr) {
-    return nullptr;
-  }
+  NULL_CHECK(nullptr, instance);
   auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
   flow * area = static_cast<flow*>(malloc(1*sizeof(flow)));
-  if(area == nullptr) {
-    return nullptr;
-  }
+  NULL_CHECK(nullptr, area);
 
   flow *new_flow = new(area) flow(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository());
   return new_flow;
 }
 
 flow *create_flow(nifi_instance *instance, const char *first_processor) {
+  NULL_CHECK(nullptr, instance);
   auto new_flow = create_new_flow(instance);
 
   if(new_flow != nullptr && first_processor != nullptr && strlen(first_processor) > 0) {
@@ -423,9 +478,7 @@ flow * create_getfile(nifi_instance * instance, flow * parent_flow, GetFileConfi
 }
 
 processor *add_processor(flow *flow, const char *processor_name) {
-  if (nullptr == flow || nullptr == processor_name) {
-    return nullptr;
-  }
+  NULL_CHECK(nullptr, flow, processor_name);
 
   auto proc = flow->addProcessor(processor_name, processor_name, core::Relationship("success", "description"), flow->hasProcessor());
   return static_cast<processor*>(proc.get());
@@ -440,25 +493,19 @@ int set_failure_strategy(flow *flow, FailureStrategy strategy) {
 }
 
 int set_propery_internal(core::Processor* proc, const char *name, const char *value) {
-  if (name != nullptr && value != nullptr) {
-    bool success = proc->setProperty(name, value) || (proc->supportsDynamicProperties() && proc->setDynamicProperty(name, value));
-    return success ? 0 : -2;
-  }
-  return -1;
+  NULL_CHECK(-1, proc, name, value);
+  bool success = proc->setProperty(name, value) || (proc->supportsDynamicProperties() && proc->setDynamicProperty(name, value));
+  return success ? 0 : -2;
 }
 
 int set_property(processor *proc, const char *name, const char *value) {
-  if (proc != nullptr) {
-    return set_propery_internal(proc, name, value);
-  }
-  return -1;
+  NULL_CHECK(-1, proc);
+  return set_propery_internal(proc, name, value);
 }
 
 int set_standalone_property(standalone_processor *proc, const char *name, const char *value) {
-  if (proc != nullptr) {
-    return set_propery_internal(proc, name, value);
-  }
-  return -1;
+  NULL_CHECK(-1, proc);
+  return set_propery_internal(proc, name, value);
 }
 
 uint8_t get_property(const processor_context * context, const char * name, char * buffer, size_t size) {
@@ -473,21 +520,8 @@ uint8_t get_property(const processor_context * context, const char * name, char
   return 0;
 }
 
-char * get_property(const processor_context *  context, const char * name) {
-  std::string value;
-  if(!context->getDynamicProperty(name, value)) {
-    return nullptr;
-  }
-  size_t len = value.length();
-  char * ret_val = (char*)malloc((len +1) * sizeof(char));
-  strncpy(ret_val, value.data(), len);
-  ret_val[len] = '\0';
-  return ret_val;
-}
-
 int free_flow(flow *flow) {
-  if (flow == nullptr)
-    return -1;
+  NULL_CHECK(-1, flow);
   flow->~flow();
   free(flow);
   return 0;
@@ -619,24 +653,12 @@ flow_file_record *invoke_chunk(standalone_processor* proc, uint8_t* buf, uint64_
 }
 
 flow_file_record *invoke_file(standalone_processor* proc, const char* path) {
-  FILE *fileptr;
-  uint8_t *buffer;
-  uint64_t filelen;
-
-  fileptr = fopen(path, "rb");
-  if (fileptr == nullptr) {
-    return nullptr;
-  }
-  fseek(fileptr, 0, SEEK_END);
-  filelen = ftell(fileptr);
-  rewind(fileptr);
-
-  buffer = (uint8_t *)malloc((filelen+1)*sizeof(uint8_t)); // Enough memory for file + \0
-  fread(buffer, filelen, 1, fileptr);
-  fclose(fileptr);
+  NULL_CHECK(nullptr, path, proc);
+  auto fb = file_to_buffer(path);
+  NULL_CHECK(nullptr, fb.buffer);
 
-  flow_file_record* ffr = invoke_chunk(proc, buffer, filelen);
-  free(buffer);
+  flow_file_record* ffr = invoke_chunk(proc, fb.buffer, fb.file_len);
+  free(fb.buffer);
   return ffr;
 }
 
diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp
index ca24ba88..bc881a30 100644
--- a/nanofi/tests/CAPITests.cpp
+++ b/nanofi/tests/CAPITests.cpp
@@ -450,4 +450,92 @@ TEST_CASE("Test custom processor", "[TestCutomProcessor]") {
   flow_file_record *record = get_next_flow_file(instance, test_flow);
 
   REQUIRE(record != nullptr);
-}
\ No newline at end of file
+}
+
+TEST_CASE("C API robustness test", "[TestRobustness]") {
+  free_flow(nullptr);
+  free_standalone_processor(nullptr);
+  free_instance(nullptr);
+
+  REQUIRE(create_processor(nullptr) == nullptr);
+
+  standalone_processor *standalone_proc = create_processor("GetFile");
+  REQUIRE(standalone_proc != nullptr);
+
+  REQUIRE(set_property(nullptr, "prop_name", "prop_value") == -1);
+
+  REQUIRE(set_standalone_property(standalone_proc, nullptr, "prop_value") == -1);
+
+  REQUIRE(set_standalone_property(standalone_proc, "prop_name", nullptr) == -1);
+
+  free_standalone_processor(standalone_proc);
+
+  const char *file_path = "path/to/file";
+
+  flow_file_record *ffr = create_ff_object_na(file_path, strlen(file_path), 0);
+
+  const char *custom_value = "custom value";
+
+  REQUIRE(add_attribute(nullptr, "custom attribute", (void *) custom_value, strlen(custom_value)) == -1);
+
+  REQUIRE(add_attribute(ffr, "custom attribute", (void *) custom_value, strlen(custom_value)) == -1);
+
+  REQUIRE(add_attribute(ffr, nullptr, (void *) custom_value, strlen(custom_value)) == -1);
+
+  REQUIRE(add_attribute(ffr, "custom attribute", nullptr, 0) == -1);
+
+  update_attribute(nullptr, "custom attribute", (void *) custom_value, strlen(custom_value));
+
+  update_attribute(ffr, nullptr, (void *) custom_value, strlen(custom_value));
+
+  attribute attr;
+  attr.key = "filename";
+  attr.value_size = 0;
+  REQUIRE(get_attribute(ffr, &attr) == -1);
+
+  REQUIRE(get_attribute(nullptr, &attr) == -1);
+
+  REQUIRE(get_attribute(ffr, nullptr) == -1);
+
+  REQUIRE(get_attribute_quantity(nullptr) == 0);
+
+  REQUIRE(get_attribute_quantity(ffr) == 0);
+
+  attribute_set attr_set;
+  attr_set.size = 3;
+  attr_set.attributes = (attribute *) malloc(attr_set.size * sizeof(attribute));  // NOLINT
+
+  REQUIRE(get_all_attributes(nullptr, &attr_set) == 0);
+
+  REQUIRE(get_all_attributes(ffr, &attr_set) == 0);
+
+  REQUIRE(get_all_attributes(ffr, nullptr) == 0);
+
+  free(attr_set.attributes);
+
+  REQUIRE(remove_attribute(nullptr, "key") == -1);
+
+  REQUIRE(remove_attribute(ffr, "key") == -1);
+
+  REQUIRE(remove_attribute(ffr, nullptr) == -1);
+
+  auto instance = create_instance_obj();
+
+  REQUIRE(transmit_flowfile(nullptr, instance) == -1);
+
+  REQUIRE(transmit_flowfile(ffr, nullptr) == -1);
+
+  REQUIRE(create_new_flow(nullptr) == nullptr);
+
+  flow *test_flow = create_new_flow(instance);
+
+  REQUIRE(test_flow != nullptr);
+
+  REQUIRE(add_processor(nullptr, "GetFile") == nullptr);
+
+  REQUIRE(add_processor(test_flow, nullptr) == nullptr);
+
+  free_flow(test_flow);
+
+  free_instance(instance);
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services