You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Alan Jackoway (JIRA)" <ji...@apache.org> on 2016/12/15 16:24:58 UTC

[jira] [Created] (NIFI-3205) Failures Can Leave New Flow Files on Disk

Alan Jackoway created NIFI-3205:
-----------------------------------

             Summary: Failures Can Leave New Flow Files on Disk
                 Key: NIFI-3205
                 URL: https://issues.apache.org/jira/browse/NIFI-3205
             Project: Apache NiFi
          Issue Type: Bug
    Affects Versions: 1.1.0
            Reporter: Alan Jackoway


We have been hitting a situation where our content repository quickly fills the entire disk despite having archiving off and close to nothing queued.

We believe this problem happens more often when a processor that creates many flow files fails in the middle.

I then created this test script and deployed it on a new nifi with a 100KB GenerateFlowFile in front of it. The script makes 5 copies of the incoming flow file, then does session.remove on those copies, then throws a RuntimeException. However, the content repository grows 500KB every time it runs. Then when you restart nifi, it cleans up the content repository with messages like this:
{noformat}
2016-12-15 11:17:29,774 INFO [main] o.a.n.c.repository.FileSystemRepository Found unknown file /Users/alanj/nifi-1.1.0/content_repository/1/1481818525279-1 (1126400 bytes) in File System Repository; archiving file
2016-12-15 11:17:29,778 INFO [main] o.a.n.c.repository.FileSystemRepository Found unknown file /Users/alanj/nifi-1.1.0/content_repository/2/1481818585493-2 (409600 bytes) in File System Repository; archiving file
{noformat}

The test processor is the following:
{code:java}
// Copyright 2016 (c) Cloudera
package com.cloudera.edh.nifi.processors.bundles;

import com.google.common.collect.Lists;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;

import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.stream.io.StreamUtils;

/**
 * Makes 5 copies of an incoming file, then fails and rolls back.
 */
@InputRequirement(value = Requirement.INPUT_REQUIRED)
public class CopyAndFail extends AbstractProcessor {
  @Override
  public void onTrigger(ProcessContext context, ProcessSession session)
      throws ProcessException {
    FlowFile inputFile = session.get();
    if (inputFile == null) {
      context.yield();
      return;
    }
    final List<FlowFile> newFiles = Lists.newArrayList();
    
    // Copy the file 5 times (simulates us opening a zip file and unpacking its contents)
    for (int i = 0; i < 5; i++) {
      session.read(inputFile, new InputStreamCallback() {
        @Override
        public void process(InputStream inputStream) throws IOException {
          FlowFile ff = session.create(inputFile);
          ff = session.write(ff, new OutputStreamCallback() {
            @Override
            public void process(final OutputStream out) throws IOException {
              StreamUtils.copy(inputStream, out);
            }
          });
          newFiles.add(ff);
        }
      });
    }
    
    getLogger().warn("Removing the new files");
    System.err.println("Removing the new files");
    session.remove(newFiles);
    
    // Simulate an error handling some file in the zip after unpacking the rest
    throw new RuntimeException();
  }
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)