You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by Mageswaran1989 <gi...@git.apache.org> on 2018/03/08 07:43:05 UTC

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

GitHub user Mageswaran1989 opened a pull request:

    https://github.com/apache/nifi/pull/2521

    NIFI-4946 nifi-spark-bundle : Adding support for pyfiles, file, jars options

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ x] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [ x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [x ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ x] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [ ] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dhiraa/nifi NIFI-4946_nifi-spark-bundle_Adding_support_for_pyfiles

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2521.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2521
    
----
commit a8301cdad0194328d420282f5e3260127776ed84
Author: Mageswaran <ma...@...>
Date:   2018-03-08T07:39:26Z

    NIFI-4946 initial commit

----


---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r173215603
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -83,6 +83,62 @@
                 .expressionLanguageSupported(true)
                 .build();
     
    +    public static final PropertyDescriptor IS_BATCH_JOB = new PropertyDescriptor.Builder()
    +            .name("exec-spark-iactive-is_batch_job")
    +            .displayName("Is Batch Job")
    +            .description("If true, the `Code` part is ignored and the flow file from previous stage is considered "
    +                    + "as a triggering event and not as code for Spark session. When `Wait` state is self routed"
    +                    + "the livy json response flow file from previous Spark job is used to poll the job status"
    +                    + "for sucess or failure")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor PY_FILES =  new PropertyDescriptor.Builder()
    +            .name("exec-spark-iactive-pyfiles")
    +            .displayName("pyFiles")
    +            .description("Python files to be used in this batch session that includes *.py, *.zip files")
    +            .required(false)
    +            .addValidator(StandardValidators.createURLorFileValidator())
    +            .expressionLanguageSupported(false)
    +            .build();
    +
    +    public static final PropertyDescriptor JAR_FILES =  new PropertyDescriptor.Builder()
    +            .name("exec-spark-iactive-jarfiles")
    +            .displayName("jars")
    --- End diff --
    
    `displayName` is what will be rendered on the UI. So lets change it to JARs or Application JARs?


---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by Mageswaran1989 <gi...@git.apache.org>.
Github user Mageswaran1989 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r173348751
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -83,6 +83,62 @@
                 .expressionLanguageSupported(true)
                 .build();
     
    +    public static final PropertyDescriptor IS_BATCH_JOB = new PropertyDescriptor.Builder()
    +            .name("exec-spark-iactive-is_batch_job")
    +            .displayName("Is Batch Job")
    +            .description("If true, the `Code` part is ignored and the flow file from previous stage is considered "
    +                    + "as a triggering event and not as code for Spark session. When `Wait` state is self routed"
    +                    + "the livy json response flow file from previous Spark job is used to poll the job status"
    +                    + "for sucess or failure")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor PY_FILES =  new PropertyDescriptor.Builder()
    +            .name("exec-spark-iactive-pyfiles")
    +            .displayName("pyFiles")
    +            .description("Python files to be used in this batch session that includes *.py, *.zip files")
    +            .required(false)
    +            .addValidator(StandardValidators.createURLorFileValidator())
    +            .expressionLanguageSupported(false)
    +            .build();
    +
    +    public static final PropertyDescriptor JAR_FILES =  new PropertyDescriptor.Builder()
    +            .name("exec-spark-iactive-jarfiles")
    +            .displayName("jars")
    --- End diff --
    
    Sure, will do that.


---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by Mageswaran1989 <gi...@git.apache.org>.
Github user Mageswaran1989 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r173349239
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro
     
             String sessionId = livyController.get("sessionId");
             String livyUrl = livyController.get("livyUrl");
    -        String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
    -        if (StringUtils.isEmpty(code)) {
    -            try (InputStream inputStream = session.read(flowFile)) {
    -                // If no code was provided, assume it is in the content of the incoming flow file
    -                code = IOUtils.toString(inputStream, charset);
    -            } catch (IOException ioe) {
    -                log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    -                flowFile = session.penalize(flowFile);
    -                session.transfer(flowFile, REL_FAILURE);
    -                return;
    -            }
    -        }
     
    -        code = StringEscapeUtils.escapeJavaScript(code);
    -        String payload = "{\"code\":\"" + code + "\"}";
    +
             try {
    -            final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval);
    -            log.debug("ExecuteSparkInteractive Result of Job Submit: " + result);
    -            if (result == null) {
    -                session.transfer(flowFile, REL_FAILURE);
    -            } else {
    +
    +            if (isBatchJob) {
    +
    +                String jsonResponse = null;
    +
    +                if (StringUtils.isEmpty(jsonResponse)) {
    --- End diff --
    
    Once current approach is accepted this can be taken care


---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r173223531
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro
     
             String sessionId = livyController.get("sessionId");
             String livyUrl = livyController.get("livyUrl");
    -        String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
    -        if (StringUtils.isEmpty(code)) {
    -            try (InputStream inputStream = session.read(flowFile)) {
    -                // If no code was provided, assume it is in the content of the incoming flow file
    -                code = IOUtils.toString(inputStream, charset);
    -            } catch (IOException ioe) {
    -                log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    -                flowFile = session.penalize(flowFile);
    -                session.transfer(flowFile, REL_FAILURE);
    -                return;
    -            }
    -        }
     
    -        code = StringEscapeUtils.escapeJavaScript(code);
    -        String payload = "{\"code\":\"" + code + "\"}";
    +
             try {
    -            final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval);
    -            log.debug("ExecuteSparkInteractive Result of Job Submit: " + result);
    -            if (result == null) {
    -                session.transfer(flowFile, REL_FAILURE);
    -            } else {
    +
    +            if (isBatchJob) {
    +
    +                String jsonResponse = null;
    +
    +                if (StringUtils.isEmpty(jsonResponse)) {
    +                    try (InputStream inputStream = session.read(flowFile)) {
    +                        // If no code was provided, assume it is in the content of the incoming flow file
    +                        jsonResponse = IOUtils.toString(inputStream, charset);
    +                    } catch (IOException ioe) {
    +                        log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    +                        flowFile = session.penalize(flowFile);
    +                        session.transfer(flowFile, REL_FAILURE);
    +                        return;
    +                    }
    +                }
    +
    +                log.debug(" ====> jsonResponse: " + jsonResponse);
    +
                     try {
    -                    final JSONObject output = result.getJSONObject("data");
    -                    flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes()));
    -                    flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    session.transfer(flowFile, REL_SUCCESS);
    -                } catch (JSONException je) {
    -                    // The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing)
    -                    log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)");
    -                    flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes()));
    +
    +                    final JSONObject jsonResponseObj = new JSONObject(jsonResponse);
    +
    +                    Map<String, String> headers = new HashMap<>();
    +                    headers.put("Content-Type", LivySessionService.APPLICATION_JSON);
    +                    headers.put("X-Requested-By", LivySessionService.USER);
    +                    headers.put("Accept", "application/json");
    +
    +                    JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers);
    +
    +                    flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes()));
                         flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    flowFile = session.penalize(flowFile);
    +
    +                    Thread.sleep(statusCheckInterval);
    +
    +                    String state  = jobInfo.getString("state");
    +                    log.debug(" ====> jsonResponseObj State: " + state);
    +
    +                    switch (state) {
    +                        case "success":
    +                            log.debug(" ====> success State: " + state);
    +                            session.transfer(flowFile, REL_SUCCESS);
    +                            break;
    +                        case "dead":
    +                            log.debug(" ====> dead State: " + state);
    +                            session.transfer(flowFile, REL_FAILURE);
    +                            break;
    +                        default:
    +                            log.debug(" ====> default State: " + state);
    +                            session.transfer(flowFile, REL_WAIT);
    +                            break;
    +                    }
    +
    +                } catch (JSONException | InterruptedException e) {
    +
    +                    //Incoming flow file is not an JSON file hence consider it to be an triggering point
    +
    +                    String batchPayload = "{ \"pyFiles\": [\"" +context.getProperty(PY_FILES).getValue()+ "\"], " +
    +                            "\"file\" : \""+context.getProperty(MAIN_PY_FILE).getValue()+"\" }";
    --- End diff --
    
    This is confusing to me. Why are we saying that if the incoming flowfile is not a valid JSON, we are going ahead with the assumption that it is going to be PySpark? I mean the assumption here lacks clarity. Please correct me, if I'm wrong.


---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by Mageswaran1989 <gi...@git.apache.org>.
Github user Mageswaran1989 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r175490223
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro
     
             String sessionId = livyController.get("sessionId");
             String livyUrl = livyController.get("livyUrl");
    -        String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
    -        if (StringUtils.isEmpty(code)) {
    -            try (InputStream inputStream = session.read(flowFile)) {
    -                // If no code was provided, assume it is in the content of the incoming flow file
    -                code = IOUtils.toString(inputStream, charset);
    -            } catch (IOException ioe) {
    -                log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    -                flowFile = session.penalize(flowFile);
    -                session.transfer(flowFile, REL_FAILURE);
    -                return;
    -            }
    -        }
     
    -        code = StringEscapeUtils.escapeJavaScript(code);
    -        String payload = "{\"code\":\"" + code + "\"}";
    +
             try {
    -            final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval);
    -            log.debug("ExecuteSparkInteractive Result of Job Submit: " + result);
    -            if (result == null) {
    -                session.transfer(flowFile, REL_FAILURE);
    -            } else {
    +
    +            if (isBatchJob) {
    +
    +                String jsonResponse = null;
    +
    +                if (StringUtils.isEmpty(jsonResponse)) {
    +                    try (InputStream inputStream = session.read(flowFile)) {
    +                        // If no code was provided, assume it is in the content of the incoming flow file
    +                        jsonResponse = IOUtils.toString(inputStream, charset);
    +                    } catch (IOException ioe) {
    +                        log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    +                        flowFile = session.penalize(flowFile);
    +                        session.transfer(flowFile, REL_FAILURE);
    +                        return;
    +                    }
    +                }
    +
    +                log.debug(" ====> jsonResponse: " + jsonResponse);
    +
                     try {
    -                    final JSONObject output = result.getJSONObject("data");
    -                    flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes()));
    -                    flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    session.transfer(flowFile, REL_SUCCESS);
    -                } catch (JSONException je) {
    -                    // The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing)
    -                    log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)");
    -                    flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes()));
    +
    +                    final JSONObject jsonResponseObj = new JSONObject(jsonResponse);
    +
    +                    Map<String, String> headers = new HashMap<>();
    +                    headers.put("Content-Type", LivySessionService.APPLICATION_JSON);
    +                    headers.put("X-Requested-By", LivySessionService.USER);
    +                    headers.put("Accept", "application/json");
    +
    +                    JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers);
    +
    +                    flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes()));
                         flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    flowFile = session.penalize(flowFile);
    +
    +                    Thread.sleep(statusCheckInterval);
    +
    +                    String state  = jobInfo.getString("state");
    +                    log.debug(" ====> jsonResponseObj State: " + state);
    +
    +                    switch (state) {
    +                        case "success":
    +                            log.debug(" ====> success State: " + state);
    +                            session.transfer(flowFile, REL_SUCCESS);
    +                            break;
    +                        case "dead":
    +                            log.debug(" ====> dead State: " + state);
    +                            session.transfer(flowFile, REL_FAILURE);
    +                            break;
    +                        default:
    +                            log.debug(" ====> default State: " + state);
    +                            session.transfer(flowFile, REL_WAIT);
    +                            break;
    +                    }
    +
    +                } catch (JSONException | InterruptedException e) {
    +
    +                    //Incoming flow file is not an JSON file hence consider it to be an triggering point
    +
    +                    String batchPayload = "{ \"pyFiles\": [\"" +context.getProperty(PY_FILES).getValue()+ "\"], " +
    +                            "\"file\" : \""+context.getProperty(MAIN_PY_FILE).getValue()+"\" }";
    --- End diff --
    
    As per the code flow @ https://issues.apache.org/jira/browse/NIFI-4946, currently I am able to send *.zip files (Python modules) through livy. My question was what should we do with flowfile, when we are using the processor to submit a batch job?


---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by Mageswaran1989 <gi...@git.apache.org>.
Github user Mageswaran1989 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r173350325
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro
     
             String sessionId = livyController.get("sessionId");
             String livyUrl = livyController.get("livyUrl");
    -        String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
    -        if (StringUtils.isEmpty(code)) {
    -            try (InputStream inputStream = session.read(flowFile)) {
    -                // If no code was provided, assume it is in the content of the incoming flow file
    -                code = IOUtils.toString(inputStream, charset);
    -            } catch (IOException ioe) {
    -                log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    -                flowFile = session.penalize(flowFile);
    -                session.transfer(flowFile, REL_FAILURE);
    -                return;
    -            }
    -        }
     
    -        code = StringEscapeUtils.escapeJavaScript(code);
    -        String payload = "{\"code\":\"" + code + "\"}";
    +
             try {
    -            final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval);
    -            log.debug("ExecuteSparkInteractive Result of Job Submit: " + result);
    -            if (result == null) {
    -                session.transfer(flowFile, REL_FAILURE);
    -            } else {
    +
    +            if (isBatchJob) {
    +
    +                String jsonResponse = null;
    +
    +                if (StringUtils.isEmpty(jsonResponse)) {
    +                    try (InputStream inputStream = session.read(flowFile)) {
    +                        // If no code was provided, assume it is in the content of the incoming flow file
    +                        jsonResponse = IOUtils.toString(inputStream, charset);
    +                    } catch (IOException ioe) {
    +                        log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    +                        flowFile = session.penalize(flowFile);
    +                        session.transfer(flowFile, REL_FAILURE);
    +                        return;
    +                    }
    +                }
    +
    +                log.debug(" ====> jsonResponse: " + jsonResponse);
    +
                     try {
    -                    final JSONObject output = result.getJSONObject("data");
    -                    flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes()));
    -                    flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    session.transfer(flowFile, REL_SUCCESS);
    -                } catch (JSONException je) {
    -                    // The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing)
    -                    log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)");
    -                    flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes()));
    +
    +                    final JSONObject jsonResponseObj = new JSONObject(jsonResponse);
    +
    +                    Map<String, String> headers = new HashMap<>();
    +                    headers.put("Content-Type", LivySessionService.APPLICATION_JSON);
    +                    headers.put("X-Requested-By", LivySessionService.USER);
    +                    headers.put("Accept", "application/json");
    +
    +                    JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers);
    +
    +                    flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes()));
                         flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    flowFile = session.penalize(flowFile);
    +
    +                    Thread.sleep(statusCheckInterval);
    +
    +                    String state  = jobInfo.getString("state");
    +                    log.debug(" ====> jsonResponseObj State: " + state);
    +
    +                    switch (state) {
    +                        case "success":
    +                            log.debug(" ====> success State: " + state);
    +                            session.transfer(flowFile, REL_SUCCESS);
    +                            break;
    +                        case "dead":
    +                            log.debug(" ====> dead State: " + state);
    +                            session.transfer(flowFile, REL_FAILURE);
    +                            break;
    +                        default:
    +                            log.debug(" ====> default State: " + state);
    +                            session.transfer(flowFile, REL_WAIT);
    +                            break;
    +                    }
    +
    +                } catch (JSONException | InterruptedException e) {
    +
    +                    //Incoming flow file is not an JSON file hence consider it to be an triggering point
    +
    +                    String batchPayload = "{ \"pyFiles\": [\"" +context.getProperty(PY_FILES).getValue()+ "\"], " +
    +                            "\"file\" : \""+context.getProperty(MAIN_PY_FILE).getValue()+"\" }";
    --- End diff --
    
    Could you please check the description @ https://issues.apache.org/jira/browse/NIFI-4946
    
    The assumption was made such that it doesn't break existing code flow and at the same time we wanted to know the status of the submitted job.
    
    So the naive idea was to re-route the Livy Json response back to Spark processor only, so that it can get last submitted `url` from the custom (tampered) JSON response, wait for user specified wait time and again query the Livy for the Job status in a loop till it succeeds or fails.
    
    So when the processor is configured to submit a Spark job, it will expect the incoming flowfile to be an custom Json response with an `url` field to query the Livy, if not it is considered as a triggering point nothing else.
    
    I am open for any ideas from your end.
    
    Thanks.


---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r173226045
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro
     
             String sessionId = livyController.get("sessionId");
             String livyUrl = livyController.get("livyUrl");
    -        String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
    -        if (StringUtils.isEmpty(code)) {
    -            try (InputStream inputStream = session.read(flowFile)) {
    -                // If no code was provided, assume it is in the content of the incoming flow file
    -                code = IOUtils.toString(inputStream, charset);
    -            } catch (IOException ioe) {
    -                log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    -                flowFile = session.penalize(flowFile);
    -                session.transfer(flowFile, REL_FAILURE);
    -                return;
    -            }
    -        }
     
    -        code = StringEscapeUtils.escapeJavaScript(code);
    -        String payload = "{\"code\":\"" + code + "\"}";
    +
             try {
    -            final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval);
    -            log.debug("ExecuteSparkInteractive Result of Job Submit: " + result);
    -            if (result == null) {
    -                session.transfer(flowFile, REL_FAILURE);
    -            } else {
    +
    +            if (isBatchJob) {
    +
    +                String jsonResponse = null;
    +
    +                if (StringUtils.isEmpty(jsonResponse)) {
    +                    try (InputStream inputStream = session.read(flowFile)) {
    +                        // If no code was provided, assume it is in the content of the incoming flow file
    +                        jsonResponse = IOUtils.toString(inputStream, charset);
    +                    } catch (IOException ioe) {
    +                        log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    +                        flowFile = session.penalize(flowFile);
    +                        session.transfer(flowFile, REL_FAILURE);
    +                        return;
    +                    }
    +                }
    +
    +                log.debug(" ====> jsonResponse: " + jsonResponse);
    +
                     try {
    -                    final JSONObject output = result.getJSONObject("data");
    -                    flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes()));
    -                    flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    session.transfer(flowFile, REL_SUCCESS);
    -                } catch (JSONException je) {
    -                    // The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing)
    -                    log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)");
    -                    flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes()));
    +
    +                    final JSONObject jsonResponseObj = new JSONObject(jsonResponse);
    +
    +                    Map<String, String> headers = new HashMap<>();
    +                    headers.put("Content-Type", LivySessionService.APPLICATION_JSON);
    +                    headers.put("X-Requested-By", LivySessionService.USER);
    +                    headers.put("Accept", "application/json");
    +
    +                    JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers);
    +
    +                    flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes()));
                         flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    flowFile = session.penalize(flowFile);
    +
    +                    Thread.sleep(statusCheckInterval);
    +
    +                    String state  = jobInfo.getString("state");
    +                    log.debug(" ====> jsonResponseObj State: " + state);
    +
    +                    switch (state) {
    +                        case "success":
    +                            log.debug(" ====> success State: " + state);
    +                            session.transfer(flowFile, REL_SUCCESS);
    +                            break;
    +                        case "dead":
    +                            log.debug(" ====> dead State: " + state);
    +                            session.transfer(flowFile, REL_FAILURE);
    +                            break;
    +                        default:
    +                            log.debug(" ====> default State: " + state);
    +                            session.transfer(flowFile, REL_WAIT);
    +                            break;
    +                    }
    +
    +                } catch (JSONException | InterruptedException e) {
    +
    +                    //Incoming flow file is not an JSON file hence consider it to be an triggering point
    --- End diff --
    
    Cosmetic change: Multiple empty lines were left. IMHO, one empty line should be enough for better readability. 


---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r173216966
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro
     
             String sessionId = livyController.get("sessionId");
             String livyUrl = livyController.get("livyUrl");
    -        String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
    -        if (StringUtils.isEmpty(code)) {
    -            try (InputStream inputStream = session.read(flowFile)) {
    -                // If no code was provided, assume it is in the content of the incoming flow file
    -                code = IOUtils.toString(inputStream, charset);
    -            } catch (IOException ioe) {
    -                log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    -                flowFile = session.penalize(flowFile);
    -                session.transfer(flowFile, REL_FAILURE);
    -                return;
    -            }
    -        }
     
    -        code = StringEscapeUtils.escapeJavaScript(code);
    -        String payload = "{\"code\":\"" + code + "\"}";
    +
             try {
    -            final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval);
    -            log.debug("ExecuteSparkInteractive Result of Job Submit: " + result);
    -            if (result == null) {
    -                session.transfer(flowFile, REL_FAILURE);
    -            } else {
    +
    +            if (isBatchJob) {
    +
    +                String jsonResponse = null;
    +
    +                if (StringUtils.isEmpty(jsonResponse)) {
    --- End diff --
    
    This will be true all the time, right?


---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r173216342
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -83,6 +83,62 @@
                 .expressionLanguageSupported(true)
                 .build();
     
    +    public static final PropertyDescriptor IS_BATCH_JOB = new PropertyDescriptor.Builder()
    +            .name("exec-spark-iactive-is_batch_job")
    +            .displayName("Is Batch Job")
    +            .description("If true, the `Code` part is ignored and the flow file from previous stage is considered "
    +                    + "as a triggering event and not as code for Spark session. When `Wait` state is self routed"
    +                    + "the livy json response flow file from previous Spark job is used to poll the job status"
    +                    + "for sucess or failure")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor PY_FILES =  new PropertyDescriptor.Builder()
    +            .name("exec-spark-iactive-pyfiles")
    +            .displayName("pyFiles")
    +            .description("Python files to be used in this batch session that includes *.py, *.zip files")
    +            .required(false)
    +            .addValidator(StandardValidators.createURLorFileValidator())
    +            .expressionLanguageSupported(false)
    +            .build();
    +
    +    public static final PropertyDescriptor JAR_FILES =  new PropertyDescriptor.Builder()
    +            .name("exec-spark-iactive-jarfiles")
    +            .displayName("jars")
    +            .description("jars to be used in this batch session")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(false)
    +            .build();
    +
    +    public static final PropertyDescriptor NAME =  new PropertyDescriptor.Builder()
    --- End diff --
    
    Is this supposed to be the Spark app name? Looks it is never used anywhere other than adding to the `PropertyDescriptor` list


---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by Mageswaran1989 <gi...@git.apache.org>.
Github user Mageswaran1989 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r173349052
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -83,6 +83,62 @@
                 .expressionLanguageSupported(true)
                 .build();
     
    +    public static final PropertyDescriptor IS_BATCH_JOB = new PropertyDescriptor.Builder()
    +            .name("exec-spark-iactive-is_batch_job")
    +            .displayName("Is Batch Job")
    +            .description("If true, the `Code` part is ignored and the flow file from previous stage is considered "
    +                    + "as a triggering event and not as code for Spark session. When `Wait` state is self routed"
    +                    + "the livy json response flow file from previous Spark job is used to poll the job status"
    +                    + "for sucess or failure")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor PY_FILES =  new PropertyDescriptor.Builder()
    +            .name("exec-spark-iactive-pyfiles")
    +            .displayName("pyFiles")
    +            .description("Python files to be used in this batch session that includes *.py, *.zip files")
    +            .required(false)
    +            .addValidator(StandardValidators.createURLorFileValidator())
    +            .expressionLanguageSupported(false)
    +            .build();
    +
    +    public static final PropertyDescriptor JAR_FILES =  new PropertyDescriptor.Builder()
    +            .name("exec-spark-iactive-jarfiles")
    +            .displayName("jars")
    +            .description("jars to be used in this batch session")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(false)
    +            .build();
    +
    +    public static final PropertyDescriptor NAME =  new PropertyDescriptor.Builder()
    --- End diff --
    
    Like said before, not yet considered. We just wanted to get a hang of the code with our basic requirements


---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by Mageswaran1989 <gi...@git.apache.org>.
Github user Mageswaran1989 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r175496952
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro
     
             String sessionId = livyController.get("sessionId");
             String livyUrl = livyController.get("livyUrl");
    -        String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
    -        if (StringUtils.isEmpty(code)) {
    -            try (InputStream inputStream = session.read(flowFile)) {
    -                // If no code was provided, assume it is in the content of the incoming flow file
    -                code = IOUtils.toString(inputStream, charset);
    -            } catch (IOException ioe) {
    -                log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    -                flowFile = session.penalize(flowFile);
    -                session.transfer(flowFile, REL_FAILURE);
    -                return;
    -            }
    -        }
     
    -        code = StringEscapeUtils.escapeJavaScript(code);
    -        String payload = "{\"code\":\"" + code + "\"}";
    +
             try {
    -            final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval);
    -            log.debug("ExecuteSparkInteractive Result of Job Submit: " + result);
    -            if (result == null) {
    -                session.transfer(flowFile, REL_FAILURE);
    -            } else {
    +
    +            if (isBatchJob) {
    +
    +                String jsonResponse = null;
    +
    +                if (StringUtils.isEmpty(jsonResponse)) {
    +                    try (InputStream inputStream = session.read(flowFile)) {
    +                        // If no code was provided, assume it is in the content of the incoming flow file
    +                        jsonResponse = IOUtils.toString(inputStream, charset);
    +                    } catch (IOException ioe) {
    +                        log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    +                        flowFile = session.penalize(flowFile);
    +                        session.transfer(flowFile, REL_FAILURE);
    +                        return;
    +                    }
    +                }
    +
    +                log.debug(" ====> jsonResponse: " + jsonResponse);
    +
                     try {
    -                    final JSONObject output = result.getJSONObject("data");
    -                    flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes()));
    -                    flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    session.transfer(flowFile, REL_SUCCESS);
    -                } catch (JSONException je) {
    -                    // The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing)
    -                    log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)");
    -                    flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes()));
    +
    +                    final JSONObject jsonResponseObj = new JSONObject(jsonResponse);
    +
    +                    Map<String, String> headers = new HashMap<>();
    +                    headers.put("Content-Type", LivySessionService.APPLICATION_JSON);
    +                    headers.put("X-Requested-By", LivySessionService.USER);
    +                    headers.put("Accept", "application/json");
    +
    +                    JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers);
    +
    +                    flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes()));
                         flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    flowFile = session.penalize(flowFile);
    +
    +                    Thread.sleep(statusCheckInterval);
    +
    +                    String state  = jobInfo.getString("state");
    +                    log.debug(" ====> jsonResponseObj State: " + state);
    +
    +                    switch (state) {
    +                        case "success":
    +                            log.debug(" ====> success State: " + state);
    +                            session.transfer(flowFile, REL_SUCCESS);
    +                            break;
    +                        case "dead":
    +                            log.debug(" ====> dead State: " + state);
    +                            session.transfer(flowFile, REL_FAILURE);
    +                            break;
    +                        default:
    +                            log.debug(" ====> default State: " + state);
    +                            session.transfer(flowFile, REL_WAIT);
    +                            break;
    +                    }
    +
    +                } catch (JSONException | InterruptedException e) {
    +
    +                    //Incoming flow file is not an JSON file hence consider it to be an triggering point
    +
    +                    String batchPayload = "{ \"pyFiles\": [\"" +context.getProperty(PY_FILES).getValue()+ "\"], " +
    +                            "\"file\" : \""+context.getProperty(MAIN_PY_FILE).getValue()+"\" }";
    --- End diff --
    
    Sometime this week I am planning to add support for jar files, args and application name over the Livy options.
    
    The catch here is unlike plain Spark code, batch process code will take time to finish which is expected one as we know. So as a hack I was re-routing the Json response after batch submission to itself, where I poll the incoming flowfile and check whether it is a Json file and if so I will try to get the "livy url" to post again to know the status of the batch job as long as it runs. After knowing the the job finished, the success route is triggered.
    
    That was the reason the I have made an assumption if the incoming file is Json, it is from previous batch job submission.
    
    In short, the flow file :
    - Is considered as a triggering point (or)
    - Is considered as plain Spark code that compiles over Livy (or)
    - Is a Livy Json response, which can further be used to check the status of long running Spark batch job
    
    I was looking for the right Nifi way of handling this? 
    
    I feel I am too conservative and trying to fit all the functionalities on one processor.
    - Flow file/property can be used to run a Spark code
    - Pyfiles can be used to run Spark batch job
    - Jars can be used to run Spark batch job
    - Args options for batch mode
    - By rerouting the success to itself, it can monitor the long running job over Livy rest APIs


---

[GitHub] nifi issue #2521: NIFI-4946 nifi-spark-bundle : Adding support for pyfiles, ...

Posted by Mageswaran1989 <gi...@git.apache.org>.
Github user Mageswaran1989 commented on the issue:

    https://github.com/apache/nifi/pull/2521
  
    Team,
    
    This MR was created in line with our current requirements. 
    Currently the changes are tested manually and found working.
    
    We would like to make this changes go in mainline, for which we need community help with reviewing and adding test code, since we are new to the Nifi extensions.
    
    Next plan is to add test cases : 
    - Create a sample PySpark example modules as part of Test resources 
    - Use that in tests
    
    Could any one point out some existing test cases that does similar testing.



---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r173213566
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro
     
             String sessionId = livyController.get("sessionId");
             String livyUrl = livyController.get("livyUrl");
    -        String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
    -        if (StringUtils.isEmpty(code)) {
    -            try (InputStream inputStream = session.read(flowFile)) {
    -                // If no code was provided, assume it is in the content of the incoming flow file
    -                code = IOUtils.toString(inputStream, charset);
    -            } catch (IOException ioe) {
    -                log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    -                flowFile = session.penalize(flowFile);
    -                session.transfer(flowFile, REL_FAILURE);
    -                return;
    -            }
    -        }
     
    -        code = StringEscapeUtils.escapeJavaScript(code);
    -        String payload = "{\"code\":\"" + code + "\"}";
    +
             try {
    -            final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval);
    -            log.debug("ExecuteSparkInteractive Result of Job Submit: " + result);
    -            if (result == null) {
    -                session.transfer(flowFile, REL_FAILURE);
    -            } else {
    +
    +            if (isBatchJob) {
    +
    +                String jsonResponse = null;
    +
    +                if (StringUtils.isEmpty(jsonResponse)) {
    +                    try (InputStream inputStream = session.read(flowFile)) {
    +                        // If no code was provided, assume it is in the content of the incoming flow file
    +                        jsonResponse = IOUtils.toString(inputStream, charset);
    +                    } catch (IOException ioe) {
    +                        log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    +                        flowFile = session.penalize(flowFile);
    +                        session.transfer(flowFile, REL_FAILURE);
    +                        return;
    +                    }
    +                }
    +
    +                log.debug(" ====> jsonResponse: " + jsonResponse);
    +
                     try {
    -                    final JSONObject output = result.getJSONObject("data");
    -                    flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes()));
    -                    flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    session.transfer(flowFile, REL_SUCCESS);
    -                } catch (JSONException je) {
    -                    // The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing)
    -                    log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)");
    -                    flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes()));
    +
    +                    final JSONObject jsonResponseObj = new JSONObject(jsonResponse);
    +
    +                    Map<String, String> headers = new HashMap<>();
    +                    headers.put("Content-Type", LivySessionService.APPLICATION_JSON);
    +                    headers.put("X-Requested-By", LivySessionService.USER);
    +                    headers.put("Accept", "application/json");
    +
    +                    JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers);
    +
    +                    flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes()));
                         flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    flowFile = session.penalize(flowFile);
    +
    +                    Thread.sleep(statusCheckInterval);
    +
    +                    String state  = jobInfo.getString("state");
    +                    log.debug(" ====> jsonResponseObj State: " + state);
    +
    +                    switch (state) {
    +                        case "success":
    +                            log.debug(" ====> success State: " + state);
    +                            session.transfer(flowFile, REL_SUCCESS);
    +                            break;
    +                        case "dead":
    +                            log.debug(" ====> dead State: " + state);
    +                            session.transfer(flowFile, REL_FAILURE);
    +                            break;
    +                        default:
    +                            log.debug(" ====> default State: " + state);
    +                            session.transfer(flowFile, REL_WAIT);
    --- End diff --
    
    Same as above for these log.debug messages as well.


---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r239241291
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro
     
             String sessionId = livyController.get("sessionId");
             String livyUrl = livyController.get("livyUrl");
    -        String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
    -        if (StringUtils.isEmpty(code)) {
    -            try (InputStream inputStream = session.read(flowFile)) {
    -                // If no code was provided, assume it is in the content of the incoming flow file
    -                code = IOUtils.toString(inputStream, charset);
    -            } catch (IOException ioe) {
    -                log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    -                flowFile = session.penalize(flowFile);
    -                session.transfer(flowFile, REL_FAILURE);
    -                return;
    -            }
    -        }
     
    -        code = StringEscapeUtils.escapeJavaScript(code);
    -        String payload = "{\"code\":\"" + code + "\"}";
    +
             try {
    -            final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval);
    -            log.debug("ExecuteSparkInteractive Result of Job Submit: " + result);
    -            if (result == null) {
    -                session.transfer(flowFile, REL_FAILURE);
    -            } else {
    +
    +            if (isBatchJob) {
    +
    +                String jsonResponse = null;
    +
    +                if (StringUtils.isEmpty(jsonResponse)) {
    +                    try (InputStream inputStream = session.read(flowFile)) {
    +                        // If no code was provided, assume it is in the content of the incoming flow file
    +                        jsonResponse = IOUtils.toString(inputStream, charset);
    +                    } catch (IOException ioe) {
    +                        log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    +                        flowFile = session.penalize(flowFile);
    +                        session.transfer(flowFile, REL_FAILURE);
    +                        return;
    +                    }
    +                }
    +
    +                log.debug(" ====> jsonResponse: " + jsonResponse);
    +
                     try {
    -                    final JSONObject output = result.getJSONObject("data");
    -                    flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes()));
    -                    flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    session.transfer(flowFile, REL_SUCCESS);
    -                } catch (JSONException je) {
    -                    // The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing)
    -                    log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)");
    -                    flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes()));
    +
    +                    final JSONObject jsonResponseObj = new JSONObject(jsonResponse);
    +
    +                    Map<String, String> headers = new HashMap<>();
    +                    headers.put("Content-Type", LivySessionService.APPLICATION_JSON);
    +                    headers.put("X-Requested-By", LivySessionService.USER);
    +                    headers.put("Accept", "application/json");
    +
    +                    JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers);
    +
    +                    flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes()));
                         flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    flowFile = session.penalize(flowFile);
    +
    +                    Thread.sleep(statusCheckInterval);
    +
    +                    String state  = jobInfo.getString("state");
    +                    log.debug(" ====> jsonResponseObj State: " + state);
    +
    +                    switch (state) {
    +                        case "success":
    +                            log.debug(" ====> success State: " + state);
    +                            session.transfer(flowFile, REL_SUCCESS);
    +                            break;
    +                        case "dead":
    +                            log.debug(" ====> dead State: " + state);
    +                            session.transfer(flowFile, REL_FAILURE);
    +                            break;
    +                        default:
    +                            log.debug(" ====> default State: " + state);
    +                            session.transfer(flowFile, REL_WAIT);
    +                            break;
    +                    }
    +
    +                } catch (JSONException | InterruptedException e) {
    +
    +                    //Incoming flow file is not an JSON file hence consider it to be an triggering point
    +
    +                    String batchPayload = "{ \"pyFiles\": [\"" +context.getProperty(PY_FILES).getValue()+ "\"], " +
    +                            "\"file\" : \""+context.getProperty(MAIN_PY_FILE).getValue()+"\" }";
    --- End diff --
    
    Since the current processor is called ExecuteSparkInteractive, you could move the batch functionality out into something called SubmitSparkJob or something. The outgoing flow file could contain the application ID and/or any other information that would allow you to monitor the job downstream (perhaps with InvokeHttp through Livy, e.g.) Are you still interested in working on this? It seems like very nice functionality to have in NiFi!


---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by Mageswaran1989 <gi...@git.apache.org>.
Github user Mageswaran1989 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r173348709
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro
     
             String sessionId = livyController.get("sessionId");
             String livyUrl = livyController.get("livyUrl");
    -        String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
    -        if (StringUtils.isEmpty(code)) {
    -            try (InputStream inputStream = session.read(flowFile)) {
    -                // If no code was provided, assume it is in the content of the incoming flow file
    -                code = IOUtils.toString(inputStream, charset);
    -            } catch (IOException ioe) {
    -                log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    -                flowFile = session.penalize(flowFile);
    -                session.transfer(flowFile, REL_FAILURE);
    -                return;
    -            }
    -        }
     
    -        code = StringEscapeUtils.escapeJavaScript(code);
    -        String payload = "{\"code\":\"" + code + "\"}";
    +
             try {
    -            final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval);
    -            log.debug("ExecuteSparkInteractive Result of Job Submit: " + result);
    -            if (result == null) {
    -                session.transfer(flowFile, REL_FAILURE);
    -            } else {
    +
    +            if (isBatchJob) {
    +
    +                String jsonResponse = null;
    +
    +                if (StringUtils.isEmpty(jsonResponse)) {
    +                    try (InputStream inputStream = session.read(flowFile)) {
    +                        // If no code was provided, assume it is in the content of the incoming flow file
    +                        jsonResponse = IOUtils.toString(inputStream, charset);
    +                    } catch (IOException ioe) {
    +                        log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    +                        flowFile = session.penalize(flowFile);
    +                        session.transfer(flowFile, REL_FAILURE);
    +                        return;
    +                    }
    +                }
    +
    +                log.debug(" ====> jsonResponse: " + jsonResponse);
    --- End diff --
    
    Sure, this will be removed in the next commit.


---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by Mageswaran1989 <gi...@git.apache.org>.
Github user Mageswaran1989 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r175417724
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro
     
             String sessionId = livyController.get("sessionId");
             String livyUrl = livyController.get("livyUrl");
    -        String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
    -        if (StringUtils.isEmpty(code)) {
    -            try (InputStream inputStream = session.read(flowFile)) {
    -                // If no code was provided, assume it is in the content of the incoming flow file
    -                code = IOUtils.toString(inputStream, charset);
    -            } catch (IOException ioe) {
    -                log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    -                flowFile = session.penalize(flowFile);
    -                session.transfer(flowFile, REL_FAILURE);
    -                return;
    -            }
    -        }
     
    -        code = StringEscapeUtils.escapeJavaScript(code);
    -        String payload = "{\"code\":\"" + code + "\"}";
    +
             try {
    -            final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval);
    -            log.debug("ExecuteSparkInteractive Result of Job Submit: " + result);
    -            if (result == null) {
    -                session.transfer(flowFile, REL_FAILURE);
    -            } else {
    +
    +            if (isBatchJob) {
    +
    +                String jsonResponse = null;
    +
    +                if (StringUtils.isEmpty(jsonResponse)) {
    +                    try (InputStream inputStream = session.read(flowFile)) {
    +                        // If no code was provided, assume it is in the content of the incoming flow file
    +                        jsonResponse = IOUtils.toString(inputStream, charset);
    +                    } catch (IOException ioe) {
    +                        log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    +                        flowFile = session.penalize(flowFile);
    +                        session.transfer(flowFile, REL_FAILURE);
    +                        return;
    +                    }
    +                }
    +
    +                log.debug(" ====> jsonResponse: " + jsonResponse);
    +
                     try {
    -                    final JSONObject output = result.getJSONObject("data");
    -                    flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes()));
    -                    flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    session.transfer(flowFile, REL_SUCCESS);
    -                } catch (JSONException je) {
    -                    // The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing)
    -                    log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)");
    -                    flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes()));
    +
    +                    final JSONObject jsonResponseObj = new JSONObject(jsonResponse);
    +
    +                    Map<String, String> headers = new HashMap<>();
    +                    headers.put("Content-Type", LivySessionService.APPLICATION_JSON);
    +                    headers.put("X-Requested-By", LivySessionService.USER);
    +                    headers.put("Accept", "application/json");
    +
    +                    JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers);
    +
    +                    flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes()));
                         flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    flowFile = session.penalize(flowFile);
    +
    +                    Thread.sleep(statusCheckInterval);
    +
    +                    String state  = jobInfo.getString("state");
    +                    log.debug(" ====> jsonResponseObj State: " + state);
    +
    +                    switch (state) {
    +                        case "success":
    +                            log.debug(" ====> success State: " + state);
    +                            session.transfer(flowFile, REL_SUCCESS);
    +                            break;
    +                        case "dead":
    +                            log.debug(" ====> dead State: " + state);
    +                            session.transfer(flowFile, REL_FAILURE);
    +                            break;
    +                        default:
    +                            log.debug(" ====> default State: " + state);
    +                            session.transfer(flowFile, REL_WAIT);
    +                            break;
    +                    }
    +
    +                } catch (JSONException | InterruptedException e) {
    +
    +                    //Incoming flow file is not an JSON file hence consider it to be an triggering point
    +
    +                    String batchPayload = "{ \"pyFiles\": [\"" +context.getProperty(PY_FILES).getValue()+ "\"], " +
    +                            "\"file\" : \""+context.getProperty(MAIN_PY_FILE).getValue()+"\" }";
    --- End diff --
    
    @zenfenan could please review above logic and suggest a way to handle plain Scala/Python code and packages source files for pyfiles/jars ?


---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r173211644
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -129,7 +185,13 @@
         public void init(final ProcessorInitializationContext context) {
             List<PropertyDescriptor> properties = new ArrayList<>();
             properties.add(LIVY_CONTROLLER_SERVICE);
    +        properties.add(IS_BATCH_JOB);
    +        properties.add(PY_FILES);
    +//        properties.add(JAR_FILES);
    +        properties.add(MAIN_PY_FILE);
    +        properties.add(NAME);
             properties.add(CODE);
    +//        properties.add(ARGS);
    --- End diff --
    
    Comments to be removed?


---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r173216684
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -83,6 +83,62 @@
                 .expressionLanguageSupported(true)
                 .build();
     
    +    public static final PropertyDescriptor IS_BATCH_JOB = new PropertyDescriptor.Builder()
    +            .name("exec-spark-iactive-is_batch_job")
    +            .displayName("Is Batch Job")
    +            .description("If true, the `Code` part is ignored and the flow file from previous stage is considered "
    +                    + "as a triggering event and not as code for Spark session. When `Wait` state is self routed"
    +                    + "the livy json response flow file from previous Spark job is used to poll the job status"
    +                    + "for sucess or failure")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor PY_FILES =  new PropertyDescriptor.Builder()
    +            .name("exec-spark-iactive-pyfiles")
    +            .displayName("pyFiles")
    +            .description("Python files to be used in this batch session that includes *.py, *.zip files")
    +            .required(false)
    +            .addValidator(StandardValidators.createURLorFileValidator())
    +            .expressionLanguageSupported(false)
    +            .build();
    +
    +    public static final PropertyDescriptor JAR_FILES =  new PropertyDescriptor.Builder()
    +            .name("exec-spark-iactive-jarfiles")
    +            .displayName("jars")
    +            .description("jars to be used in this batch session")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(false)
    +            .build();
    +
    +    public static final PropertyDescriptor NAME =  new PropertyDescriptor.Builder()
    +            .name("exec-spark-iactive-name")
    +            .displayName("name")
    +            .description("The name of this session")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(false)
    +            .build();
    +
    +    public static final PropertyDescriptor ARGS =  new PropertyDescriptor.Builder()
    +            .name("exec-spark-iactive-args")
    +            .displayName("args")
    +            .description("The name of this session")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(false)
    +            .build();
    +
    +    public static final PropertyDescriptor MAIN_PY_FILE = new PropertyDescriptor.Builder()
    +            .name("exec-spark-iactive-main-py-file")
    +            .displayName("file")
    --- End diff --
    
    Same as the JARs case. Most of the `PropertyDescriptor` use all lowercase characters for `displayName`. Please change it.


---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r175481806
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro
     
             String sessionId = livyController.get("sessionId");
             String livyUrl = livyController.get("livyUrl");
    -        String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
    -        if (StringUtils.isEmpty(code)) {
    -            try (InputStream inputStream = session.read(flowFile)) {
    -                // If no code was provided, assume it is in the content of the incoming flow file
    -                code = IOUtils.toString(inputStream, charset);
    -            } catch (IOException ioe) {
    -                log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    -                flowFile = session.penalize(flowFile);
    -                session.transfer(flowFile, REL_FAILURE);
    -                return;
    -            }
    -        }
     
    -        code = StringEscapeUtils.escapeJavaScript(code);
    -        String payload = "{\"code\":\"" + code + "\"}";
    +
             try {
    -            final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval);
    -            log.debug("ExecuteSparkInteractive Result of Job Submit: " + result);
    -            if (result == null) {
    -                session.transfer(flowFile, REL_FAILURE);
    -            } else {
    +
    +            if (isBatchJob) {
    +
    +                String jsonResponse = null;
    +
    +                if (StringUtils.isEmpty(jsonResponse)) {
    +                    try (InputStream inputStream = session.read(flowFile)) {
    +                        // If no code was provided, assume it is in the content of the incoming flow file
    +                        jsonResponse = IOUtils.toString(inputStream, charset);
    +                    } catch (IOException ioe) {
    +                        log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    +                        flowFile = session.penalize(flowFile);
    +                        session.transfer(flowFile, REL_FAILURE);
    +                        return;
    +                    }
    +                }
    +
    +                log.debug(" ====> jsonResponse: " + jsonResponse);
    +
                     try {
    -                    final JSONObject output = result.getJSONObject("data");
    -                    flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes()));
    -                    flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    session.transfer(flowFile, REL_SUCCESS);
    -                } catch (JSONException je) {
    -                    // The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing)
    -                    log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)");
    -                    flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes()));
    +
    +                    final JSONObject jsonResponseObj = new JSONObject(jsonResponse);
    +
    +                    Map<String, String> headers = new HashMap<>();
    +                    headers.put("Content-Type", LivySessionService.APPLICATION_JSON);
    +                    headers.put("X-Requested-By", LivySessionService.USER);
    +                    headers.put("Accept", "application/json");
    +
    +                    JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers);
    +
    +                    flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes()));
                         flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    flowFile = session.penalize(flowFile);
    +
    +                    Thread.sleep(statusCheckInterval);
    +
    +                    String state  = jobInfo.getString("state");
    +                    log.debug(" ====> jsonResponseObj State: " + state);
    +
    +                    switch (state) {
    +                        case "success":
    +                            log.debug(" ====> success State: " + state);
    +                            session.transfer(flowFile, REL_SUCCESS);
    +                            break;
    +                        case "dead":
    +                            log.debug(" ====> dead State: " + state);
    +                            session.transfer(flowFile, REL_FAILURE);
    +                            break;
    +                        default:
    +                            log.debug(" ====> default State: " + state);
    +                            session.transfer(flowFile, REL_WAIT);
    +                            break;
    +                    }
    +
    +                } catch (JSONException | InterruptedException e) {
    +
    +                    //Incoming flow file is not an JSON file hence consider it to be an triggering point
    +
    +                    String batchPayload = "{ \"pyFiles\": [\"" +context.getProperty(PY_FILES).getValue()+ "\"], " +
    +                            "\"file\" : \""+context.getProperty(MAIN_PY_FILE).getValue()+"\" }";
    --- End diff --
    
    The code can be submitted using the `CODE` property, right? That used to work. Or are you asking of a way to upload/send files or jars through Livy?


---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r173213146
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro
     
             String sessionId = livyController.get("sessionId");
             String livyUrl = livyController.get("livyUrl");
    -        String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
    -        if (StringUtils.isEmpty(code)) {
    -            try (InputStream inputStream = session.read(flowFile)) {
    -                // If no code was provided, assume it is in the content of the incoming flow file
    -                code = IOUtils.toString(inputStream, charset);
    -            } catch (IOException ioe) {
    -                log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    -                flowFile = session.penalize(flowFile);
    -                session.transfer(flowFile, REL_FAILURE);
    -                return;
    -            }
    -        }
     
    -        code = StringEscapeUtils.escapeJavaScript(code);
    -        String payload = "{\"code\":\"" + code + "\"}";
    +
             try {
    -            final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval);
    -            log.debug("ExecuteSparkInteractive Result of Job Submit: " + result);
    -            if (result == null) {
    -                session.transfer(flowFile, REL_FAILURE);
    -            } else {
    +
    +            if (isBatchJob) {
    +
    +                String jsonResponse = null;
    +
    +                if (StringUtils.isEmpty(jsonResponse)) {
    +                    try (InputStream inputStream = session.read(flowFile)) {
    +                        // If no code was provided, assume it is in the content of the incoming flow file
    +                        jsonResponse = IOUtils.toString(inputStream, charset);
    +                    } catch (IOException ioe) {
    +                        log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    +                        flowFile = session.penalize(flowFile);
    +                        session.transfer(flowFile, REL_FAILURE);
    +                        return;
    +                    }
    +                }
    +
    +                log.debug(" ====> jsonResponse: " + jsonResponse);
    --- End diff --
    
    Cosmetic change: It would be great if this log.debug message can be changed to something of a proper standard, like "JSON Response : <the JSON response>" i.e. Remove ====> 


---

[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

Posted by Mageswaran1989 <gi...@git.apache.org>.
Github user Mageswaran1989 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r173348636
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -129,7 +185,13 @@
         public void init(final ProcessorInitializationContext context) {
             List<PropertyDescriptor> properties = new ArrayList<>();
             properties.add(LIVY_CONTROLLER_SERVICE);
    +        properties.add(IS_BATCH_JOB);
    +        properties.add(PY_FILES);
    +//        properties.add(JAR_FILES);
    +        properties.add(MAIN_PY_FILE);
    +        properties.add(NAME);
             properties.add(CODE);
    +//        properties.add(ARGS);
    --- End diff --
    
    Only `pyfiles` and `file` options are tested. Rest are yet to be tested.
    
    Plan was to go with implementing test modules and test other features, since the current manual testing takes a long routine of compile, copy and restart of the Nifi.


---