You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Chris Sampson (Jira)" <ji...@apache.org> on 2022/10/17 13:27:00 UTC

[jira] [Comment Edited] (NIFI-5982) Processor PutElasticsearchHttpRecord should a relation Response

    [ https://issues.apache.org/jira/browse/NIFI-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17618816#comment-17618816 ] 

Chris Sampson edited comment on NIFI-5982 at 10/17/22 1:26 PM:
---------------------------------------------------------------

My recommendation would be to use the newer {{PutElasticearchRecord}} processor, which uses the Elasticsearch REST Client libraries for connections and has been tested against newer versions of Elasticsearch (5 - 8). As of NIFI-9581, this processor includes an {{errors}} relationship, which does exactly what you suggest here - i.e. output any Record(s) from the {{input}} FlowFile that caused errors when sent to Elasticsearch.

The {{successful_records}} relationship will also output the Record(s) that were successfully sent to Elasticsearch.

Both require a {{RecordWriter}} to be configured (e.g. you can output the Records as JSON or other Record format such as Avro or CSV).

{{PutElasticsearchHttpRecord}} has been deprecated in favour of the {{PutElasticsearchRecord}} processor, so likely further changes won't be made to the older processor now (the replacement processor should now support all the same functionality as the older one, plus more).

The same functionality has also been included in the {{PutElasticsearchJson}} processor for those not using Record-based processors in their Flow (e.g. just have FlowFiles with single JSON Objects in that they want to index in Elasticsearch), but the processor supports Batching so can group multiple FlowFiles together into a single call to Elasticsearch for efficiency.


was (Author: chris s):
My recommendation would be to use the newer {{PutElasticearchRecord}} processor, which uses the Elasticsearch REST Client libraries for connections and has been tested against newer versions of Elasticsearch (5 - 8). This processor includes an {{errors}} relationship, which does exactly what you suggest here - i.e. output any Record(s) from the {{input}} FlowFile that caused errors when sent to Elasticsearch.

The {{successful_records}} relationship will also output the Record(s) that were successfully sent to Elasticsearch.

Both require a {{RecordWriter}} to be configured (e.g. you can output the Records as JSON or other Record format such as Avro or CSV).

{{PutElasticsearchHttpRecord}} has been deprecated in favour of the {{PutElasticsearchRecord}} processor, so likely further changes won't be made to the older processor now (the replacement processor should now support all the same functionality as the older one, plus more).

The same functionality has also been included in the {{PutElasticsearchJson}} processor for those not using Record-based processors in their Flow (e.g. just have FlowFiles with single JSON Objects in that they want to index in Elasticsearch), but the processor supports Batching so can group multiple FlowFiles together into a single call to Elasticsearch for efficiency.

> Processor PutElasticsearchHttpRecord should a relation Response
> ---------------------------------------------------------------
>
>                 Key: NIFI-5982
>                 URL: https://issues.apache.org/jira/browse/NIFI-5982
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>    Affects Versions: 1.5.0
>            Reporter: Abdou Bourakba
>            Priority: Minor
>
> When Using the processor  PutElasticsearchHttpRecord,
> Nifi send request to Elastic bulk interface. the Request include multiple lines.
> If the request fails, we have no indications which of the lines were rejected.
> However in the response that ES sends the details of successfull or failed operations are included.
> That's why I think a response relation would be useful.
> The modification is not very hard to implement, but I think it could be usefull to treat failure during the Bulk operation.  here below a exemple of implementation :
>  
> {code:java}
> // code 
> if (!isSuccess(statusCode)) {
>     ResponseBody responseBody = getResponse.body();
>     try {
>         final byte[] bodyBytes = responseBody.bytes();
>         JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
>         FlowFile fileResponse = session.create();
>         fileResponse = session.write(fileResponse, new OutputStreamCallback() {
>             @Override
>             public void process(final OutputStream out) throws IOException {
>                 getLogger().debug(responseJson.toString());
>                 out.write(responseJson.toString().getBytes());
>             }
>         });
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)