You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Mark Payne (JIRA)" <ji...@apache.org> on 2018/12/06 21:21:00 UTC

[jira] [Created] (NIFI-5879) ContentNotFoundException thrown if a FlowFile's content claim is read, then written to, then read again, within the same ProcessSession

Mark Payne created NIFI-5879:
--------------------------------

             Summary: ContentNotFoundException thrown if a FlowFile's content claim is read, then written to, then read again, within the same ProcessSession
                 Key: NIFI-5879
                 URL: https://issues.apache.org/jira/browse/NIFI-5879
             Project: Apache NiFi
          Issue Type: Bug
          Components: Core Framework
            Reporter: Mark Payne
            Assignee: Mark Payne
             Fix For: 1.9.0


The following Processor can be used to replicate the issue.

If a processor reads content, then attempts to write to the content, then read what was just written, a ContentNotFoundException will be thrown.

 

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.processors.standard;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.stream.io.StreamUtils;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;

import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static org.apache.nifi.processor.util.StandardValidators.POSITIVE_INTEGER_VALIDATOR;

public class ReplicateWeirdness extends AbstractProcessor {

 static final PropertyDescriptor CLONE_ITERATIONS = new Builder()
 .name("Iterations")
 .displayName("Iterations")
 .description("Number of Iterations")
 .required(true)
 .addValidator(POSITIVE_INTEGER_VALIDATOR)
 .expressionLanguageSupported(NONE)
 .defaultValue("1")
 .build();

 static final PropertyDescriptor WRITE_ITERATIONS = new Builder()
 .name("Write Iterations")
 .displayName("Write Iterations")
 .description("Write Iterations")
 .required(true)
 .addValidator(POSITIVE_INTEGER_VALIDATOR)
 .expressionLanguageSupported(NONE)
 .defaultValue("2")
 .build();

 static final PropertyDescriptor READ_FIRST = new Builder()
 .name("Read First")
 .displayName("Read First")
 .description("Read First")
 .required(true)
 .allowableValues("true", "false")
 .expressionLanguageSupported(NONE)
 .defaultValue("false")
 .build();


 static final Relationship REL_SUCCESS = new Relationship.Builder()
 .name("success")
 .build();

 @Override
 public Set<Relationship> getRelationships() {
 return Collections.singleton(REL_SUCCESS);
 }

 @Override
 protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
 final List<PropertyDescriptor> properties = new ArrayList<>();
 properties.add(CLONE_ITERATIONS);
 properties.add(WRITE_ITERATIONS);
 properties.add(READ_FIRST);
 return properties;
 }

 @Override
 public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
 FlowFile original = session.get();
 if (original == null) {
 return;
 }

 try (final InputStream in = session.read(original)) {
 final long originalLength = countBytes(in);
 getLogger().info("Original FlowFile is " + originalLength + " bytes");
 } catch (final IOException e) {
 throw new ProcessException(e);
 }

 final int cloneIterations = context.getProperty(CLONE_ITERATIONS).asInteger();
 final int writeIterations = context.getProperty(WRITE_ITERATIONS).asInteger();
 final boolean readFirst = context.getProperty(READ_FIRST).asBoolean();

 for (int i=0; i < cloneIterations; i++) {
 FlowFile clone = session.clone(original);

 for (int w = 0; w < writeIterations; w++) {
 if (readFirst) {
 try (InputStream in = session.read(clone)) {
 final long len = countBytes(in);
 getLogger().info("Read " + len + " bytes");
 } catch (IOException e) {
 throw new ProcessException(e);
 }
 }

 clone = session.write(clone, out -> out.write("boom".getBytes()));
 clone = session.write(clone, StreamUtils::copy);
 }

 session.transfer(clone, REL_SUCCESS);
 }

 session.transfer(original, REL_SUCCESS);
 }

 private long countBytes(final InputStream in) throws IOException {
 int len = 0;
 while (in.read() >= 0) {
 len++;
 }

 return len;
 }
}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)