You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/04/21 11:12:59 UTC

[GitHub] [nifi-minifi-cpp] james94 opened a new pull request #763: MINIFI-1199: Integrate H2O Driverless AI in MiNiFi

james94 opened a new pull request #763:
URL: https://github.com/apache/nifi-minifi-cpp/pull/763


   **MiNiFi C++ and H2O Driverless AI Integration** via Custom Python Processors:
   
   Integrates MiNiFi C++ with H2O's Driverless AI by Using Driverless AI's Python Scoring Pipeline and MiNiFi's Custom Python Processors. Uses the Python Processors to execute the Python Scoring Pipeline scorer to do batch scoring and real-time scoring for one or more predicted labels on test data in the incoming flow file content. I would like to contribute my processors to MiNiFi C++ as a new feature.
   
   **3 custom python processors** created for MiNiFi:
   
   **H2oPspScoreRealTime** - Executes H2O Driverless AI's Python Scoring Pipeline to do interactive scoring (real-time) scoring on an individual row or list of test data within each incoming flow file.
   
   **H2oPspScoreBatches** - Executes H2O Driverless AI's Python Scoring Pipeline to do batch scoring on a frame of data within each incoming flow file.
   
   **ConvertDsToCsv** - Converts data source of incoming flow file to csv. 
   
   I do have a question about Python Processors, I notice the ExecutePythonProcessor has a Module Directory property for passing in paths to files and/or directories which contain modules required by the script. I am wondering what if there is a "module name" that the user needs to enter in the processor property to update the scripts import "module name" before that line of code is executed? Is that possible? and if so, how would one do that?
   
   Is there a place on the repo I can add config-batch-scoring.yml and config-interactive-scoring.yml files to show users how to get started with these processors quickly?
   
   Created Jira ticket associated with this PR: https://issues.apache.org/jira/browse/MINIFICPP-1199
   
   - Note: more information about the 3 python processors is available in the Jira link above.
   
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] phrocker commented on issue #763: MINIFI-1199: Integrate H2O Driverless AI in MiNiFi

Posted by GitBox <gi...@apache.org>.
phrocker commented on issue #763:
URL: https://github.com/apache/nifi-minifi-cpp/pull/763#issuecomment-617161556


   Thanks @james94  I'll take a look. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] james94 commented on issue #763: MINIFI-1199: Integrate H2O Driverless AI PSP in MiNiFi

Posted by GitBox <gi...@apache.org>.
james94 commented on issue #763:
URL: https://github.com/apache/nifi-minifi-cpp/pull/763#issuecomment-617880230


   @phrocker I moved the **ConvertDsToCsv** python processor to the `h2o/` base directory since it is not only meant to be used with the 2 Python Scoring Pipeline processor(s): **H2oPspScoreRealTime** and **H2oPspScoreBatches**, but it can also be used with the 1 MOJO Scoring Pipeline processor: **H2oMojoPwScoring**. **ConvertDsToCsv** can also be used without the scoring pipeline processors just to convert a variety of data sources to csv.
   
   Note: I do have another PR for **H2oMojoPwScoring** python processor: [MINIFI-1201: Integrate H2O Driverless AI MSP in MiNFi](https://github.com/apache/nifi-minifi-cpp/pull/766).
   
    I looked at the travis failures and they look unrelated to this update in the PR.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] james94 commented on a change in pull request #763: MINIFI-1199: Integrate H2O Driverless AI PSP in MiNiFi

Posted by GitBox <gi...@apache.org>.
james94 commented on a change in pull request #763:
URL: https://github.com/apache/nifi-minifi-cpp/pull/763#discussion_r412711936



##########
File path: extensions/pythonprocessors/h2o/dai/psp/H2oPspScoreBatches.py
##########
@@ -0,0 +1,98 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+    Install the following with pip
+
+    -- after downloading the python scoring pipeline from Driverless AI,
+       the following was needed for executing the Scorer to make predictions
+    pip install -r requirements.txt
+
+        This requirements.txt file includes pip packages that are available on
+        the internet for install, but there are some packages that only come with
+        the python scoring pipeline download, which include:
+    h2oaicore-1.8.4.1-cp36-cp36m-linux_x86_64.whl
+    scoring_h2oai_experiment_6a77d0a4_6a25_11ea_becf_0242ac110002-1.0.0-py3-none-any.whl
+"""
+import codecs
+import pandas as pd
+import datatable as dt
+from scipy.special._ufuncs import expit
+from scoring_h2oai_experiment_6a77d0a4_6a25_11ea_becf_0242ac110002 import Scorer
+
+def describe(processor):
+    """ describe what this processor does
+    """
+    processor.setDescription("Executes H2O's Python Scoring Pipeline to do batch \
+        scoring for one or more predicted label(s) on the tabular test data in the \
+        incoming flow file content.")
+
+def onInitialize(processor):
+    """ onInitialize is where you can set properties
+    """
+    processor.setSupportsDynamicProperties()
+
+class ContentExtract(object):
+    """ ContentExtract callback class is defined for reading streams of data through the session
+        and has a process function that accepts the input stream
+    """
+    def __init__(self):
+        self.content = None
+    
+    def process(self, input_stream):
+        """ Use codecs getReader to read that data
+        """
+        self.content = codecs.getreader('utf-8')(input_stream).read()
+        return len(self.content)
+
+class ContentWrite(object):
+    """ ContentWrite callback class is defined for writing streams of data through the session
+    """
+    def __init__(self, data):
+        self.content = data
+
+    def process(self, output_stream):
+        """ Use codecs getWriter to write data encoded to the stream
+        """
+        codecs.getwriter('utf-8')(output_stream).write(self.content)
+        return len(self.content)
+
+def onTrigger(context, session):
+    """ onTrigger is executed and passed processor context and session
+    """
+    flow_file = session.get()
+    if flow_file is not None:
+        read_cb = ContentExtract()
+        # read flow file tabular data content into read_cb.content data member
+        session.read(flow_file, read_cb)
+        # instantiate H2O's python scoring pipeline scorer
+        scorer = Scorer()
+        # load tabular data str into datatable
+        test_dt_frame = dt.Frame(read_cb.content)
+        # do batch scoring on test data in datatable frame, return pandas df with predicted labels
+        batch_scores_df = scorer.score_batch(test_dt_frame)
+        # convert df to str, then write to flow file
+        batch_scores_df_str = batch_scores_df.to_string()

Review comment:
       @phrocker I want to make one update to the **H2oPspScoreBatches.py** processor.
   
   I noticed the batch predictions are stored into a csv file with an extra column called index. Index starts from 0 to number of rows - 1. I realized after the **H2oPspScoreBatches** processor makes batch predictions, it inserts an index column for the first column and the second column are the predictions. The index column is not needed and can be removed by adding parameter **index=False** to the **batch_scores_df.to_string()** function on line 87.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] james94 commented on issue #763: MINIFI-1199: Integrate H2O Driverless AI PSP in MiNiFi

Posted by GitBox <gi...@apache.org>.
james94 commented on issue #763:
URL: https://github.com/apache/nifi-minifi-cpp/pull/763#issuecomment-617528034


   @phrocker I looked at the travis and appveyor failures and they look unrelated to my PR. I didn't see any errors relating to the 3 python processors above.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org