You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Matt Burgess (JIRA)" <ji...@apache.org> on 2019/01/07 16:09:00 UTC

[jira] [Updated] (NIFI-5879) ContentNotFoundException thrown if a FlowFile's content claim is read, then written to, then read again, within the same ProcessSession

     [ https://issues.apache.org/jira/browse/NIFI-5879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matt Burgess updated NIFI-5879:
-------------------------------
    Resolution: Fixed
        Status: Resolved  (was: Patch Available)

> ContentNotFoundException thrown if a FlowFile's content claim is read, then written to, then read again, within the same ProcessSession
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: NIFI-5879
>                 URL: https://issues.apache.org/jira/browse/NIFI-5879
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>            Priority: Major
>             Fix For: 1.9.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> The following Processor can be used to replicate the issue.
> If a processor reads content, then attempts to write to the content, then read what was just written, a ContentNotFoundException will be thrown.
>  
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements. See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License. You may obtain a copy of the License at
>  *
>  * http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package org.apache.nifi.processors.standard;
> import org.apache.nifi.components.PropertyDescriptor;
> import org.apache.nifi.components.PropertyDescriptor.Builder;
> import org.apache.nifi.flowfile.FlowFile;
> import org.apache.nifi.processor.AbstractProcessor;
> import org.apache.nifi.processor.ProcessContext;
> import org.apache.nifi.processor.ProcessSession;
> import org.apache.nifi.processor.Relationship;
> import org.apache.nifi.processor.exception.ProcessException;
> import org.apache.nifi.stream.io.StreamUtils;
> import java.io.IOException;
> import java.io.InputStream;
> import java.util.ArrayList;
> import java.util.Collections;
> import java.util.List;
> import java.util.Set;
> import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
> import static org.apache.nifi.processor.util.StandardValidators.POSITIVE_INTEGER_VALIDATOR;
> public class ReplicateWeirdness extends AbstractProcessor {
>  static final PropertyDescriptor CLONE_ITERATIONS = new Builder()
>  .name("Iterations")
>  .displayName("Iterations")
>  .description("Number of Iterations")
>  .required(true)
>  .addValidator(POSITIVE_INTEGER_VALIDATOR)
>  .expressionLanguageSupported(NONE)
>  .defaultValue("1")
>  .build();
>  static final PropertyDescriptor WRITE_ITERATIONS = new Builder()
>  .name("Write Iterations")
>  .displayName("Write Iterations")
>  .description("Write Iterations")
>  .required(true)
>  .addValidator(POSITIVE_INTEGER_VALIDATOR)
>  .expressionLanguageSupported(NONE)
>  .defaultValue("2")
>  .build();
>  static final PropertyDescriptor READ_FIRST = new Builder()
>  .name("Read First")
>  .displayName("Read First")
>  .description("Read First")
>  .required(true)
>  .allowableValues("true", "false")
>  .expressionLanguageSupported(NONE)
>  .defaultValue("false")
>  .build();
>  static final Relationship REL_SUCCESS = new Relationship.Builder()
>  .name("success")
>  .build();
>  @Override
>  public Set<Relationship> getRelationships() {
>  return Collections.singleton(REL_SUCCESS);
>  }
>  @Override
>  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
>  final List<PropertyDescriptor> properties = new ArrayList<>();
>  properties.add(CLONE_ITERATIONS);
>  properties.add(WRITE_ITERATIONS);
>  properties.add(READ_FIRST);
>  return properties;
>  }
>  @Override
>  public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
>  FlowFile original = session.get();
>  if (original == null) {
>  return;
>  }
>  try (final InputStream in = session.read(original)) {
>  final long originalLength = countBytes(in);
>  getLogger().info("Original FlowFile is " + originalLength + " bytes");
>  } catch (final IOException e) {
>  throw new ProcessException(e);
>  }
>  final int cloneIterations = context.getProperty(CLONE_ITERATIONS).asInteger();
>  final int writeIterations = context.getProperty(WRITE_ITERATIONS).asInteger();
>  final boolean readFirst = context.getProperty(READ_FIRST).asBoolean();
>  for (int i=0; i < cloneIterations; i++) {
>  FlowFile clone = session.clone(original);
>  for (int w = 0; w < writeIterations; w++) {
>  if (readFirst) {
>  try (InputStream in = session.read(clone)) {
>  final long len = countBytes(in);
>  getLogger().info("Read " + len + " bytes");
>  } catch (IOException e) {
>  throw new ProcessException(e);
>  }
>  }
>  clone = session.write(clone, out -> out.write("boom".getBytes()));
>  clone = session.write(clone, StreamUtils::copy);
>  }
>  session.transfer(clone, REL_SUCCESS);
>  }
>  session.transfer(original, REL_SUCCESS);
>  }
>  private long countBytes(final InputStream in) throws IOException {
>  int len = 0;
>  while (in.read() >= 0) {
>  len++;
>  }
>  return len;
>  }
> }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)