You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by M Singh <ma...@yahoo.com.INVALID> on 2016/02/10 17:41:55 UTC

Nifi - Adding attributes to flow file results in FlowFileHandlingException when flow file is transferred

Hi:
I am processing some flow files and want to add success and failure attributes to the processed flow file and then transfer it.  But this is producing an exception:
org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=432bc163-28a0-4d08-b9e8-1674a649ae8c,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1455121175949-1, container=default, section=1], offset=0, length=1],offset=0,name=129790440390423,size=1] is not known in this session (StandardProcessSession[id=70]) at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:2361) ~[nifi-framework-core-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
Here is the code segment in the onTrigger method:
Note - If I comment out the lines (as shown below) where I tried to add attributes to the flow file and it works.  If I uncomment the lines (either adding single attributes or multiple, the exception is produced)
        try {            List<Record> records = new ArrayList<>();
            // Prepare batch of records            for (int i = 0; i < flowFiles.size(); i++) {                final ByteArrayOutputStream baos = new ByteArrayOutputStream();                session.exportTo(flowFiles.get(i), baos);                records.add(new Record().withData(ByteBuffer.wrap(baos.toByteArray())));            }
            // Send the batch            PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest();            putRecordBatchRequest.setDeliveryStreamName(streamName);            putRecordBatchRequest.setRecords(records);            PutRecordBatchResult results = client.putRecordBatch(putRecordBatchRequest);
            // Separate out the successful and failed flow files            List<PutRecordBatchResponseEntry> responseEntries = results.getRequestResponses();            List<FlowFile> failedFlowFiles = new ArrayList<>();            List<FlowFile> successfulFlowFiles = new ArrayList<>();            for (int i = 0; i < responseEntries.size(); i++ ) {                PutRecordBatchResponseEntry entry = responseEntries.get(i);                FlowFile flowFile = flowFiles.get(i);
                Map<String,String> attributes = new HashMap<>();                attributes.put(RECORD_ID, entry.getRecordId());// NOTE - If I uncomment this line - or any other which adds attributes to the flowfile - i get the exception//                session.putAttribute(flowFile,RECORD_ID, entry.getRecordId());                if ( ! StringUtils.isBlank(entry.getErrorCode()) ) {                    attributes.put(ERROR_CODE, entry.getErrorCode());                    attributes.put(ERROR_MESSAGE, entry.getErrorMessage());//                    session.putAllAttributes(flowFile, attributes);                    failedFlowFiles.add(flowFile);                } else {//                    session.putAllAttributes(flowFile, attributes);                    successfulFlowFiles.add(flowFile);                }            }
            if ( failedFlowFiles.size() > 0 ) {                session.transfer(failedFlowFiles, REL_FAILURE);                getLogger().error("Failed to send {} records {}", new Object[]{stream, failedFlowFiles});            }
            if ( successfulFlowFiles.size() > 0 ) {// Throws exception when attributes are added to flow files                session.transfer(successfulFlowFiles, REL_SUCCESS);                getLogger().info("Success sent {} records {}", new Object[]{stream, successfulFlowFiles});            }
            records.clear();

Re: Nifi - Adding attributes to flow file results in FlowFileHandlingException when flow file is transferred

Posted by M Singh <ma...@yahoo.com.INVALID>.
Bryan : 
That was indeed the issue.  Thanks for your quick response.
Mans 

    On Wednesday, February 10, 2016 8:59 AM, Bryan Bende <bb...@gmail.com> wrote:
 

 Hello,

The error message is indicating that you are trying to transfer an unknown
FlowFile because it is transferring a reference to the original FlowFile
before you updated the attributes. You would need to assign the result of
putAllAttributes (or putAttribute) and then transfer that:

flowFile = session.putAllAttributes(flowFile, attributes);

Thanks,

Bryan

On Wed, Feb 10, 2016 at 11:41 AM, M Singh <ma...@yahoo.com.invalid>
wrote:

> Hi:
> I am processing some flow files and want to add success and failure
> attributes to the processed flow file and then transfer it.  But this is
> producing an exception:
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=432bc163-28a0-4d08-b9e8-1674a649ae8c,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1455121175949-1, container=default,
> section=1], offset=0, length=1],offset=0,name=129790440390423,size=1] is
> not known in this session (StandardProcessSession[id=70]) at
> org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:2361)
> ~[nifi-framework-core-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
> Here is the code segment in the onTrigger method:
> Note - If I comment out the lines (as shown below) where I tried to add
> attributes to the flow file and it works.  If I uncomment the lines (either
> adding single attributes or multiple, the exception is produced)
>        try {            List<Record> records = new ArrayList<>();
>            // Prepare batch of records            for (int i = 0; i <
> flowFiles.size(); i++) {                final ByteArrayOutputStream baos =
> new ByteArrayOutputStream();
> session.exportTo(flowFiles.get(i), baos);                records.add(new
> Record().withData(ByteBuffer.wrap(baos.toByteArray())));            }
>            // Send the batch            PutRecordBatchRequest
> putRecordBatchRequest = new PutRecordBatchRequest();
> putRecordBatchRequest.setDeliveryStreamName(streamName);
> putRecordBatchRequest.setRecords(records);            PutRecordBatchResult
> results = client.putRecordBatch(putRecordBatchRequest);
>            // Separate out the successful and failed flow files
>  List<PutRecordBatchResponseEntry> responseEntries =
> results.getRequestResponses();            List<FlowFile> failedFlowFiles =
> new ArrayList<>();            List<FlowFile> successfulFlowFiles = new
> ArrayList<>();            for (int i = 0; i < responseEntries.size(); i++ )
> {                PutRecordBatchResponseEntry entry =
> responseEntries.get(i);                FlowFile flowFile = flowFiles.get(i);
>                Map<String,String> attributes = new HashMap<>();
>      attributes.put(RECORD_ID, entry.getRecordId());// NOTE - If I
> uncomment this line - or any other which adds attributes to the flowfile -
> i get the exception//
> session.putAttribute(flowFile,RECORD_ID, entry.getRecordId());
>  if ( ! StringUtils.isBlank(entry.getErrorCode()) ) {
> attributes.put(ERROR_CODE, entry.getErrorCode());
> attributes.put(ERROR_MESSAGE, entry.getErrorMessage());//
>  session.putAllAttributes(flowFile, attributes);
> failedFlowFiles.add(flowFile);                } else {//
> session.putAllAttributes(flowFile, attributes);
> successfulFlowFiles.add(flowFile);                }            }
>            if ( failedFlowFiles.size() > 0 ) {
> session.transfer(failedFlowFiles, REL_FAILURE);
> getLogger().error("Failed to send {} records {}", new Object[]{stream,
> failedFlowFiles});            }
>            if ( successfulFlowFiles.size() > 0 ) {// Throws exception
> when attributes are added to flow files
> session.transfer(successfulFlowFiles, REL_SUCCESS);
> getLogger().info("Success sent {} records {}", new Object[]{stream,
> successfulFlowFiles});            }
>            records.clear();


  

Re: Nifi - Adding attributes to flow file results in FlowFileHandlingException when flow file is transferred

Posted by Bryan Bende <bb...@gmail.com>.
Hello,

The error message is indicating that you are trying to transfer an unknown
FlowFile because it is transferring a reference to the original FlowFile
before you updated the attributes. You would need to assign the result of
putAllAttributes (or putAttribute) and then transfer that:

flowFile = session.putAllAttributes(flowFile, attributes);

Thanks,

Bryan

On Wed, Feb 10, 2016 at 11:41 AM, M Singh <ma...@yahoo.com.invalid>
wrote:

> Hi:
> I am processing some flow files and want to add success and failure
> attributes to the processed flow file and then transfer it.  But this is
> producing an exception:
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=432bc163-28a0-4d08-b9e8-1674a649ae8c,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1455121175949-1, container=default,
> section=1], offset=0, length=1],offset=0,name=129790440390423,size=1] is
> not known in this session (StandardProcessSession[id=70]) at
> org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:2361)
> ~[nifi-framework-core-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
> Here is the code segment in the onTrigger method:
> Note - If I comment out the lines (as shown below) where I tried to add
> attributes to the flow file and it works.  If I uncomment the lines (either
> adding single attributes or multiple, the exception is produced)
>         try {            List<Record> records = new ArrayList<>();
>             // Prepare batch of records            for (int i = 0; i <
> flowFiles.size(); i++) {                final ByteArrayOutputStream baos =
> new ByteArrayOutputStream();
> session.exportTo(flowFiles.get(i), baos);                records.add(new
> Record().withData(ByteBuffer.wrap(baos.toByteArray())));            }
>             // Send the batch            PutRecordBatchRequest
> putRecordBatchRequest = new PutRecordBatchRequest();
> putRecordBatchRequest.setDeliveryStreamName(streamName);
> putRecordBatchRequest.setRecords(records);            PutRecordBatchResult
> results = client.putRecordBatch(putRecordBatchRequest);
>             // Separate out the successful and failed flow files
>   List<PutRecordBatchResponseEntry> responseEntries =
> results.getRequestResponses();            List<FlowFile> failedFlowFiles =
> new ArrayList<>();            List<FlowFile> successfulFlowFiles = new
> ArrayList<>();            for (int i = 0; i < responseEntries.size(); i++ )
> {                PutRecordBatchResponseEntry entry =
> responseEntries.get(i);                FlowFile flowFile = flowFiles.get(i);
>                 Map<String,String> attributes = new HashMap<>();
>       attributes.put(RECORD_ID, entry.getRecordId());// NOTE - If I
> uncomment this line - or any other which adds attributes to the flowfile -
> i get the exception//
> session.putAttribute(flowFile,RECORD_ID, entry.getRecordId());
>   if ( ! StringUtils.isBlank(entry.getErrorCode()) ) {
> attributes.put(ERROR_CODE, entry.getErrorCode());
> attributes.put(ERROR_MESSAGE, entry.getErrorMessage());//
>   session.putAllAttributes(flowFile, attributes);
> failedFlowFiles.add(flowFile);                } else {//
> session.putAllAttributes(flowFile, attributes);
> successfulFlowFiles.add(flowFile);                }            }
>             if ( failedFlowFiles.size() > 0 ) {
> session.transfer(failedFlowFiles, REL_FAILURE);
> getLogger().error("Failed to send {} records {}", new Object[]{stream,
> failedFlowFiles});            }
>             if ( successfulFlowFiles.size() > 0 ) {// Throws exception
> when attributes are added to flow files
> session.transfer(successfulFlowFiles, REL_SUCCESS);
> getLogger().info("Success sent {} records {}", new Object[]{stream,
> successfulFlowFiles});            }
>             records.clear();