You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by M Singh <ma...@yahoo.com.INVALID> on 2016/02/10 17:41:55 UTC
Nifi - Adding attributes to flow file results in
FlowFileHandlingException when flow file is transferred
Hi:
I am processing some flow files and want to add success and failure attributes to the processed flow file and then transfer it. But this is producing an exception:
org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=432bc163-28a0-4d08-b9e8-1674a649ae8c,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1455121175949-1, container=default, section=1], offset=0, length=1],offset=0,name=129790440390423,size=1] is not known in this session (StandardProcessSession[id=70]) at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:2361) ~[nifi-framework-core-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
Here is the code segment in the onTrigger method:
Note - If I comment out the lines (as shown below) where I tried to add attributes to the flow file and it works. If I uncomment the lines (either adding single attributes or multiple, the exception is produced)
try { List<Record> records = new ArrayList<>();
// Prepare batch of records for (int i = 0; i < flowFiles.size(); i++) { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); session.exportTo(flowFiles.get(i), baos); records.add(new Record().withData(ByteBuffer.wrap(baos.toByteArray()))); }
// Send the batch PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest(); putRecordBatchRequest.setDeliveryStreamName(streamName); putRecordBatchRequest.setRecords(records); PutRecordBatchResult results = client.putRecordBatch(putRecordBatchRequest);
// Separate out the successful and failed flow files List<PutRecordBatchResponseEntry> responseEntries = results.getRequestResponses(); List<FlowFile> failedFlowFiles = new ArrayList<>(); List<FlowFile> successfulFlowFiles = new ArrayList<>(); for (int i = 0; i < responseEntries.size(); i++ ) { PutRecordBatchResponseEntry entry = responseEntries.get(i); FlowFile flowFile = flowFiles.get(i);
Map<String,String> attributes = new HashMap<>(); attributes.put(RECORD_ID, entry.getRecordId());// NOTE - If I uncomment this line - or any other which adds attributes to the flowfile - i get the exception// session.putAttribute(flowFile,RECORD_ID, entry.getRecordId()); if ( ! StringUtils.isBlank(entry.getErrorCode()) ) { attributes.put(ERROR_CODE, entry.getErrorCode()); attributes.put(ERROR_MESSAGE, entry.getErrorMessage());// session.putAllAttributes(flowFile, attributes); failedFlowFiles.add(flowFile); } else {// session.putAllAttributes(flowFile, attributes); successfulFlowFiles.add(flowFile); } }
if ( failedFlowFiles.size() > 0 ) { session.transfer(failedFlowFiles, REL_FAILURE); getLogger().error("Failed to send {} records {}", new Object[]{stream, failedFlowFiles}); }
if ( successfulFlowFiles.size() > 0 ) {// Throws exception when attributes are added to flow files session.transfer(successfulFlowFiles, REL_SUCCESS); getLogger().info("Success sent {} records {}", new Object[]{stream, successfulFlowFiles}); }
records.clear();
Re: Nifi - Adding attributes to flow file results in
FlowFileHandlingException when flow file is transferred
Posted by M Singh <ma...@yahoo.com.INVALID>.
Bryan :
That was indeed the issue. Thanks for your quick response.
Mans
On Wednesday, February 10, 2016 8:59 AM, Bryan Bende <bb...@gmail.com> wrote:
Hello,
The error message is indicating that you are trying to transfer an unknown
FlowFile because it is transferring a reference to the original FlowFile
before you updated the attributes. You would need to assign the result of
putAllAttributes (or putAttribute) and then transfer that:
flowFile = session.putAllAttributes(flowFile, attributes);
Thanks,
Bryan
On Wed, Feb 10, 2016 at 11:41 AM, M Singh <ma...@yahoo.com.invalid>
wrote:
> Hi:
> I am processing some flow files and want to add success and failure
> attributes to the processed flow file and then transfer it. But this is
> producing an exception:
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=432bc163-28a0-4d08-b9e8-1674a649ae8c,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1455121175949-1, container=default,
> section=1], offset=0, length=1],offset=0,name=129790440390423,size=1] is
> not known in this session (StandardProcessSession[id=70]) at
> org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:2361)
> ~[nifi-framework-core-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
> Here is the code segment in the onTrigger method:
> Note - If I comment out the lines (as shown below) where I tried to add
> attributes to the flow file and it works. If I uncomment the lines (either
> adding single attributes or multiple, the exception is produced)
> try { List<Record> records = new ArrayList<>();
> // Prepare batch of records for (int i = 0; i <
> flowFiles.size(); i++) { final ByteArrayOutputStream baos =
> new ByteArrayOutputStream();
> session.exportTo(flowFiles.get(i), baos); records.add(new
> Record().withData(ByteBuffer.wrap(baos.toByteArray()))); }
> // Send the batch PutRecordBatchRequest
> putRecordBatchRequest = new PutRecordBatchRequest();
> putRecordBatchRequest.setDeliveryStreamName(streamName);
> putRecordBatchRequest.setRecords(records); PutRecordBatchResult
> results = client.putRecordBatch(putRecordBatchRequest);
> // Separate out the successful and failed flow files
> List<PutRecordBatchResponseEntry> responseEntries =
> results.getRequestResponses(); List<FlowFile> failedFlowFiles =
> new ArrayList<>(); List<FlowFile> successfulFlowFiles = new
> ArrayList<>(); for (int i = 0; i < responseEntries.size(); i++ )
> { PutRecordBatchResponseEntry entry =
> responseEntries.get(i); FlowFile flowFile = flowFiles.get(i);
> Map<String,String> attributes = new HashMap<>();
> attributes.put(RECORD_ID, entry.getRecordId());// NOTE - If I
> uncomment this line - or any other which adds attributes to the flowfile -
> i get the exception//
> session.putAttribute(flowFile,RECORD_ID, entry.getRecordId());
> if ( ! StringUtils.isBlank(entry.getErrorCode()) ) {
> attributes.put(ERROR_CODE, entry.getErrorCode());
> attributes.put(ERROR_MESSAGE, entry.getErrorMessage());//
> session.putAllAttributes(flowFile, attributes);
> failedFlowFiles.add(flowFile); } else {//
> session.putAllAttributes(flowFile, attributes);
> successfulFlowFiles.add(flowFile); } }
> if ( failedFlowFiles.size() > 0 ) {
> session.transfer(failedFlowFiles, REL_FAILURE);
> getLogger().error("Failed to send {} records {}", new Object[]{stream,
> failedFlowFiles}); }
> if ( successfulFlowFiles.size() > 0 ) {// Throws exception
> when attributes are added to flow files
> session.transfer(successfulFlowFiles, REL_SUCCESS);
> getLogger().info("Success sent {} records {}", new Object[]{stream,
> successfulFlowFiles}); }
> records.clear();
Re: Nifi - Adding attributes to flow file results in
FlowFileHandlingException when flow file is transferred
Posted by Bryan Bende <bb...@gmail.com>.
Hello,
The error message is indicating that you are trying to transfer an unknown
FlowFile because it is transferring a reference to the original FlowFile
before you updated the attributes. You would need to assign the result of
putAllAttributes (or putAttribute) and then transfer that:
flowFile = session.putAllAttributes(flowFile, attributes);
Thanks,
Bryan
On Wed, Feb 10, 2016 at 11:41 AM, M Singh <ma...@yahoo.com.invalid>
wrote:
> Hi:
> I am processing some flow files and want to add success and failure
> attributes to the processed flow file and then transfer it. But this is
> producing an exception:
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=432bc163-28a0-4d08-b9e8-1674a649ae8c,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1455121175949-1, container=default,
> section=1], offset=0, length=1],offset=0,name=129790440390423,size=1] is
> not known in this session (StandardProcessSession[id=70]) at
> org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:2361)
> ~[nifi-framework-core-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
> Here is the code segment in the onTrigger method:
> Note - If I comment out the lines (as shown below) where I tried to add
> attributes to the flow file and it works. If I uncomment the lines (either
> adding single attributes or multiple, the exception is produced)
> try { List<Record> records = new ArrayList<>();
> // Prepare batch of records for (int i = 0; i <
> flowFiles.size(); i++) { final ByteArrayOutputStream baos =
> new ByteArrayOutputStream();
> session.exportTo(flowFiles.get(i), baos); records.add(new
> Record().withData(ByteBuffer.wrap(baos.toByteArray()))); }
> // Send the batch PutRecordBatchRequest
> putRecordBatchRequest = new PutRecordBatchRequest();
> putRecordBatchRequest.setDeliveryStreamName(streamName);
> putRecordBatchRequest.setRecords(records); PutRecordBatchResult
> results = client.putRecordBatch(putRecordBatchRequest);
> // Separate out the successful and failed flow files
> List<PutRecordBatchResponseEntry> responseEntries =
> results.getRequestResponses(); List<FlowFile> failedFlowFiles =
> new ArrayList<>(); List<FlowFile> successfulFlowFiles = new
> ArrayList<>(); for (int i = 0; i < responseEntries.size(); i++ )
> { PutRecordBatchResponseEntry entry =
> responseEntries.get(i); FlowFile flowFile = flowFiles.get(i);
> Map<String,String> attributes = new HashMap<>();
> attributes.put(RECORD_ID, entry.getRecordId());// NOTE - If I
> uncomment this line - or any other which adds attributes to the flowfile -
> i get the exception//
> session.putAttribute(flowFile,RECORD_ID, entry.getRecordId());
> if ( ! StringUtils.isBlank(entry.getErrorCode()) ) {
> attributes.put(ERROR_CODE, entry.getErrorCode());
> attributes.put(ERROR_MESSAGE, entry.getErrorMessage());//
> session.putAllAttributes(flowFile, attributes);
> failedFlowFiles.add(flowFile); } else {//
> session.putAllAttributes(flowFile, attributes);
> successfulFlowFiles.add(flowFile); } }
> if ( failedFlowFiles.size() > 0 ) {
> session.transfer(failedFlowFiles, REL_FAILURE);
> getLogger().error("Failed to send {} records {}", new Object[]{stream,
> failedFlowFiles}); }
> if ( successfulFlowFiles.size() > 0 ) {// Throws exception
> when attributes are added to flow files
> session.transfer(successfulFlowFiles, REL_SUCCESS);
> getLogger().info("Success sent {} records {}", new Object[]{stream,
> successfulFlowFiles}); }
> records.clear();