You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2019/01/02 16:49:58 UTC

[GitHub] asfgit closed pull request #461: MINIFICPP-692 - Ensure calls to get a flowfile are consistent across …

asfgit closed pull request #461: MINIFICPP-692 - Ensure calls to get a flowfile are consistent across …
URL: https://github.com/apache/nifi-minifi-cpp/pull/461
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h
index 06504922..04f43e56 100644
--- a/nanofi/include/api/nanofi.h
+++ b/nanofi/include/api/nanofi.h
@@ -131,7 +131,7 @@ flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c);
  **/
 processor *add_processor(flow * flow, const char * name);
 
-processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_session *session));
+processor *add_python_processor(flow *, processor_logic* logic);
 
 /**
  * Create a standalone instance of the given processor.
@@ -233,8 +233,6 @@ flow_file_record *get_next_flow_file(nifi_instance *, flow *);
  **/
 size_t get_flow_files(nifi_instance * instance, flow * flow, flow_file_record ** flowfiles, size_t size);
 
-flow_file_record *get(nifi_instance *,flow *, processor_session *);
-
 /**
  * Invoke a standalone processor without input data.
  * The processor is expected to generate flow file.
@@ -282,7 +280,7 @@ flow_file_record* create_ff_object_na(const char *file, const size_t len, const
  * @param context current processor context
  * @return a flow file record or nullptr in case there is none in the session
  **/
-flow_file_record* get_flowfile(processor_session* session, processor_context* context);
+flow_file_record* get(processor_session *session, processor_context *context);
 
 
 /**
@@ -299,7 +297,7 @@ void free_flowfile(flow_file_record* ff);
  * @size size size of the data pointed by "value"
  * @return 0 in case of success, -1 otherwise (already existed)
  **/
-uint8_t add_attribute(flow_file_record*, const char *key, void *value, size_t size);
+uint8_t add_attribute(flow_file_record* ff, const char *key, void *value, size_t size);
 
 /**
  * Updates an attribute (adds if it hasn't existed before)
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index aa957aca..a58f3253 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -397,14 +397,14 @@ flow *create_flow(nifi_instance *instance, const char *first_processor) {
   return new_flow;
 }
 
-processor *add_python_processor(flow *flow, void (*ontrigger_callback)(processor_session *)) {
-  if (nullptr == flow || nullptr == ontrigger_callback) {
+processor *add_python_processor(flow *flow, processor_logic* logic) {
+  if (nullptr == flow || nullptr == logic) {
     return nullptr;
   }
-  auto lambda = [ontrigger_callback](core::ProcessSession *ps) {
-    ontrigger_callback(static_cast<processor_session*>(ps));  //Meh, sorry for this
+  auto lambda = [logic](core::ProcessSession *ps, core::ProcessContext *cx) {
+    logic(static_cast<processor_session*>(ps), static_cast<processor_context*>(cx));  //Meh, sorry for this
   };
-  auto proc = flow->addSimpleCallback(nullptr, lambda);
+  auto proc = flow->addCallback(nullptr, lambda);
   return static_cast<processor*>(proc.get());
 }
 
@@ -518,7 +518,7 @@ flow_file_record* flowfile_to_record(std::shared_ptr<core::FlowFile> ff, Executi
   return flowfile_to_record(ff, plan->getContentRepo());
 }
 
-flow_file_record* get_flowfile(processor_session* session, processor_context* context) {
+flow_file_record* get(processor_session *session, processor_context *context) {
   auto ff = session->get();
   if(!ff) {
     return nullptr;
@@ -554,14 +554,6 @@ size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff
   return i;
 }
 
-flow_file_record * get(nifi_instance * instance, flow * flow, processor_session * session) {
-  if (nullptr == instance || nullptr == flow || nullptr == session)
-    return nullptr;
-  auto ff = session->get();
-  flow->setNextFlowFile(ff);
-  return flowfile_to_record(ff, flow);
-}
-
 flow_file_record *invoke(standalone_processor* proc) {
   return invoke_ff(proc, nullptr);
 }
diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp
index ca24ba88..35cbfc25 100644
--- a/nanofi/tests/CAPITests.cpp
+++ b/nanofi/tests/CAPITests.cpp
@@ -55,7 +55,7 @@ void big_failure_counter(flow_file_record * fr) {
 }
 
 void custom_processor_logic(processor_session * ps, processor_context * ctx) {
-  flow_file_record * ffr = get_flowfile(ps, ctx);
+  flow_file_record * ffr = get(ps, ctx);
   REQUIRE(ffr != nullptr);
   uint8_t * buffer = (uint8_t*)malloc(ffr->size* sizeof(uint8_t));
   get_content(ffr, buffer, ffr->size);
diff --git a/python/getFile.py b/python/getFile.py
index 3d2b9d07..fd3e3357 100644
--- a/python/getFile.py
+++ b/python/getFile.py
@@ -23,16 +23,20 @@
 
 
 class GetFilePrinterProcessor(PyProcessor):
-    def __init__(self,instance, minifi, flow):
-        PyProcessor.__init__(self,instance,minifi,flow)
+    def __init__(self, minifi, flow):
+        PyProcessor.__init__(self, minifi, flow)
         self._callback = None
 
     def _onTriggerCallback(self):
-        def onTrigger(session):
-            flow_file = self.get(session)
+        def onTrigger(session, context):
+            flow_file = self.get(session, context)
             if flow_file:
-              flow_file.add_attribute("python_test","value")
-              self.transfer(session,flow_file, "success")
+                if flow_file.add_attribute("python_test","value"):
+                    print("Add attribute succeeded")
+                if not flow_file.add_attribute("python_test","value2"):
+                    print("Cannot add the same attribute twice!")
+                print ("original file name: " + flow_file.get_attribute("filename"))
+                self.transfer(session, flow_file, "success")
         return CALLBACK(onTrigger)
 
 
diff --git a/python/minifi/__init__.py b/python/minifi/__init__.py
index 7352eb7b..b6de1d66 100644
--- a/python/minifi/__init__.py
+++ b/python/minifi/__init__.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 from ctypes import cdll
 import ctypes
-from abc import  abstractmethod
+from abc import abstractmethod
 
 
 
@@ -36,13 +36,22 @@ class CFlowFile(ctypes.Structure):
                  ('attributes', ctypes.c_void_p),
                  ('ffp', ctypes.c_void_p)]
 
+class CAttribute(ctypes.Structure):
+    _fields_ = [('key', ctypes.c_char_p),
+                ('value', ctypes.c_void_p),
+                ('value_size', ctypes.c_size_t)]
+
 class CProcessor(ctypes.Structure):
     _fields_ = [('processor_ptr', ctypes.c_void_p)]
 
 class CProcessSession(ctypes.Structure):
     _fields_ = [('process_session', ctypes.c_void_p)]
 
-CALLBACK = ctypes.CFUNCTYPE(None, ctypes.POINTER(CProcessSession))
+class CProcessContext(ctypes.Structure):
+    _fields_ = [('process_context', ctypes.c_void_p)]
+
+
+CALLBACK = ctypes.CFUNCTYPE(None, ctypes.POINTER(CProcessSession), ctypes.POINTER(CProcessContext))
 
 class Processor(object):
     def __init__(self, cprocessor, minifi):
@@ -54,17 +63,16 @@ def set_property(self, name, value):
         self._minifi.set_property( self._proc, name.encode("UTF-8"), value.encode("UTF-8"))
 
 class PyProcessor(object):
-    def __init__(self, instance, minifi, flow):
+    def __init__(self, minifi, flow):
         super(PyProcessor, self).__init__()
-        self._instance = instance
         self._minifi = minifi
         self._flow = flow
 
     def setBase(self, proc):
         self._proc = proc
 
-    def get(self, session):
-        ff = self._minifi.get(self._instance.get_instance(),self._flow, session)
+    def get(self, session, context):
+        ff = self._minifi.get(session, context)
         if ff:
             return FlowFile(self._minifi, ff)
         else:
@@ -102,9 +110,22 @@ def __init__(self, minifi, ff):
         self._minifi = minifi
         self._ff = ff
 
+    def get_attribute(self, name):
+        attr = CAttribute(name.encode("UTF-8"), 0, 0)
+        if self._minifi.get_attribute(self._ff, attr) != 0:
+            return ""
+        if attr.value_size > 0:
+            return ctypes.cast(attr.value, ctypes.c_char_p).value.decode("ascii")
+        return ""
+
     def add_attribute(self, name, value):
         vallen = len(value)
-        self._minifi.add_attribute(self._ff, name.encode("UTF-8"), value.encode("UTF-8"), vallen)
+        ret = self._minifi.add_attribute(self._ff, name.encode("UTF-8"), value.encode("UTF-8"), vallen)
+        return True if ret == 0 else False
+
+    def update_attribute(self, name, value):
+        vallen = len(value)
+        self._minifi.update_attribute(self._ff, name.encode("UTF-8"), value.encode("UTF-8"), vallen)
 
     def get_instance(self):
         return self._ff
@@ -138,7 +159,7 @@ def __init__(self, dll_file, url, port):
         self._minifi.transmit_flowfile.argtypes = [ctypes.POINTER(CFlowFile) , ctypes.POINTER(NIFI_STRUCT) ]
         self._minifi.transmit_flowfile.restype = ctypes.c_int
         """ get ff """
-        self._minifi.get.argtypes = [ctypes.POINTER(NIFI_STRUCT) , ctypes.POINTER(CFlow), ctypes.POINTER(CProcessSession) ]
+        self._minifi.get.argtypes = [ctypes.POINTER(CProcessSession), ctypes.POINTER(CProcessContext) ]
         self._minifi.get.restype = ctypes.POINTER(CFlowFile)
         """ add python processor """
         self._minifi.add_python_processor.argtypes = [ctypes.POINTER(CFlow) , ctypes.c_void_p ]
@@ -150,6 +171,14 @@ def __init__(self, dll_file, url, port):
         self._minifi.add_attribute.argtypes = [ctypes.POINTER(CFlowFile), ctypes.c_char_p, ctypes.c_char_p, ctypes.c_int ]
         self._minifi.add_attribute.restype = ctypes.c_int
 
+        """ update (overwrite) attribute to ff """
+        self._minifi.update_attribute.argtypes = [ctypes.POINTER(CFlowFile), ctypes.c_char_p, ctypes.c_char_p, ctypes.c_int ]
+        self._minifi.update_attribute.restype = None
+
+        """ get attribute of ff """
+        self._minifi.get_attribute.argtypes = [ctypes.POINTER(CFlowFile), ctypes.POINTER(CAttribute) ]
+        self._minifi.get_attribute.restype = ctypes.c_int
+
         self._minifi.init_api.argtype = ctypes.c_char_p
         self._minifi.init_api.restype = ctypes.c_int
         self._minifi.init_api(dll_file.encode("UTF-8"))
@@ -178,7 +207,7 @@ def add_processor(self, processor):
         return Processor(proc,self._minifi)
 
     def create_python_processor(self, module, processor):
-        m =  getattr(module,processor)(self._instance,self._minifi,self._flow)
+        m =  getattr(module, processor)(self._minifi, self._flow)
         proc = self._minifi.add_python_processor(self._flow, m.getTriggerCallback())
         m.setBase(proc)
         return m


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services