You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/12/05 21:20:00 UTC

[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

    [ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710643#comment-16710643 ] 

ASF GitHub Bot commented on NIFI-4946:
--------------------------------------

Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r239241291
  
    --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---
    @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro
     
             String sessionId = livyController.get("sessionId");
             String livyUrl = livyController.get("livyUrl");
    -        String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
    -        if (StringUtils.isEmpty(code)) {
    -            try (InputStream inputStream = session.read(flowFile)) {
    -                // If no code was provided, assume it is in the content of the incoming flow file
    -                code = IOUtils.toString(inputStream, charset);
    -            } catch (IOException ioe) {
    -                log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    -                flowFile = session.penalize(flowFile);
    -                session.transfer(flowFile, REL_FAILURE);
    -                return;
    -            }
    -        }
     
    -        code = StringEscapeUtils.escapeJavaScript(code);
    -        String payload = "{\"code\":\"" + code + "\"}";
    +
             try {
    -            final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval);
    -            log.debug("ExecuteSparkInteractive Result of Job Submit: " + result);
    -            if (result == null) {
    -                session.transfer(flowFile, REL_FAILURE);
    -            } else {
    +
    +            if (isBatchJob) {
    +
    +                String jsonResponse = null;
    +
    +                if (StringUtils.isEmpty(jsonResponse)) {
    +                    try (InputStream inputStream = session.read(flowFile)) {
    +                        // If no code was provided, assume it is in the content of the incoming flow file
    +                        jsonResponse = IOUtils.toString(inputStream, charset);
    +                    } catch (IOException ioe) {
    +                        log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    +                        flowFile = session.penalize(flowFile);
    +                        session.transfer(flowFile, REL_FAILURE);
    +                        return;
    +                    }
    +                }
    +
    +                log.debug(" ====> jsonResponse: " + jsonResponse);
    +
                     try {
    -                    final JSONObject output = result.getJSONObject("data");
    -                    flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes()));
    -                    flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    session.transfer(flowFile, REL_SUCCESS);
    -                } catch (JSONException je) {
    -                    // The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing)
    -                    log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)");
    -                    flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes()));
    +
    +                    final JSONObject jsonResponseObj = new JSONObject(jsonResponse);
    +
    +                    Map<String, String> headers = new HashMap<>();
    +                    headers.put("Content-Type", LivySessionService.APPLICATION_JSON);
    +                    headers.put("X-Requested-By", LivySessionService.USER);
    +                    headers.put("Accept", "application/json");
    +
    +                    JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers);
    +
    +                    flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes()));
                         flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    flowFile = session.penalize(flowFile);
    +
    +                    Thread.sleep(statusCheckInterval);
    +
    +                    String state  = jobInfo.getString("state");
    +                    log.debug(" ====> jsonResponseObj State: " + state);
    +
    +                    switch (state) {
    +                        case "success":
    +                            log.debug(" ====> success State: " + state);
    +                            session.transfer(flowFile, REL_SUCCESS);
    +                            break;
    +                        case "dead":
    +                            log.debug(" ====> dead State: " + state);
    +                            session.transfer(flowFile, REL_FAILURE);
    +                            break;
    +                        default:
    +                            log.debug(" ====> default State: " + state);
    +                            session.transfer(flowFile, REL_WAIT);
    +                            break;
    +                    }
    +
    +                } catch (JSONException | InterruptedException e) {
    +
    +                    //Incoming flow file is not an JSON file hence consider it to be an triggering point
    +
    +                    String batchPayload = "{ \"pyFiles\": [\"" +context.getProperty(PY_FILES).getValue()+ "\"], " +
    +                            "\"file\" : \""+context.getProperty(MAIN_PY_FILE).getValue()+"\" }";
    --- End diff --
    
    Since the current processor is called ExecuteSparkInteractive, you could move the batch functionality out into something called SubmitSparkJob or something. The outgoing flow file could contain the application ID and/or any other information that would allow you to monitor the job downstream (perhaps with InvokeHttp through Livy, e.g.) Are you still interested in working on this? It seems like very nice functionality to have in NiFi!


> nifi-spark-bundle : Adding support for pyfiles, file, jars options
> ------------------------------------------------------------------
>
>                 Key: NIFI-4946
>                 URL: https://issues.apache.org/jira/browse/NIFI-4946
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>    Affects Versions: 1.6.0
>         Environment: Ubuntu 16.04, IntelliJ
>            Reporter: Mageswaran
>            Priority: Major
>         Attachments: nifi-spark-options.png, nifi-spark.png
>
>
> Adding support for submitting PySpark based Sparks jobs (which is normally structured as modules) over Livy on existing "ExecuteSparkInteractive" processor.
> This is done by reading file paths for pyfiles and file and an option from user whether the processor should trigger a batch job or not.
> [https://livy.incubator.apache.org/docs/latest/rest-api.html]
>  *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 )|https://github.com/apache/nifi/pull/2521]*
>  * Check whether the processor has to handle code or submit a Spark job
>  * Read incoming flow file
>  ** If batch == true
>  *** If flow file matches Livy `batches` JSON response through `wait` loop
>  **** Wait for Status Check Interval
>  **** Read the state
>  **** If state is `running` route it to `wait` or if it  is `success` or `dead` route it accordingly
>  *** Else
>  **** Ignore the flow file
>  **** Trigger the Spark job over Livy `batches` endpoint
>  **** Read the state of the submitted job
>  **** If state is `running` route it to `wait` or if it  is `success` or `dead` route it accordingly
>  ** Else:
>  *** Existing Logic to handle `Code`
>  
> !nifi-spark-options.png!
> !nifi-spark.png!
>  
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)