You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Mark Payne (JIRA)" <ji...@apache.org> on 2018/12/06 21:24:01 UTC
[jira] [Updated] (NIFI-5879) ContentNotFoundException thrown if a
FlowFile's content claim is read, then written to, then read again, within
the same ProcessSession
[ https://issues.apache.org/jira/browse/NIFI-5879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mark Payne updated NIFI-5879:
-----------------------------
Status: Patch Available (was: Open)
> ContentNotFoundException thrown if a FlowFile's content claim is read, then written to, then read again, within the same ProcessSession
> ---------------------------------------------------------------------------------------------------------------------------------------
>
> Key: NIFI-5879
> URL: https://issues.apache.org/jira/browse/NIFI-5879
> Project: Apache NiFi
> Issue Type: Bug
> Components: Core Framework
> Reporter: Mark Payne
> Assignee: Mark Payne
> Priority: Major
> Fix For: 1.9.0
>
>
> The following Processor can be used to replicate the issue.
> If a processor reads content, then attempts to write to the content, then read what was just written, a ContentNotFoundException will be thrown.
>
> /*
> * Licensed to the Apache Software Foundation (ASF) under one or more
> * contributor license agreements. See the NOTICE file distributed with
> * this work for additional information regarding copyright ownership.
> * The ASF licenses this file to You under the Apache License, Version 2.0
> * (the "License"); you may not use this file except in compliance with
> * the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> package org.apache.nifi.processors.standard;
> import org.apache.nifi.components.PropertyDescriptor;
> import org.apache.nifi.components.PropertyDescriptor.Builder;
> import org.apache.nifi.flowfile.FlowFile;
> import org.apache.nifi.processor.AbstractProcessor;
> import org.apache.nifi.processor.ProcessContext;
> import org.apache.nifi.processor.ProcessSession;
> import org.apache.nifi.processor.Relationship;
> import org.apache.nifi.processor.exception.ProcessException;
> import org.apache.nifi.stream.io.StreamUtils;
> import java.io.IOException;
> import java.io.InputStream;
> import java.util.ArrayList;
> import java.util.Collections;
> import java.util.List;
> import java.util.Set;
> import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
> import static org.apache.nifi.processor.util.StandardValidators.POSITIVE_INTEGER_VALIDATOR;
> public class ReplicateWeirdness extends AbstractProcessor {
> static final PropertyDescriptor CLONE_ITERATIONS = new Builder()
> .name("Iterations")
> .displayName("Iterations")
> .description("Number of Iterations")
> .required(true)
> .addValidator(POSITIVE_INTEGER_VALIDATOR)
> .expressionLanguageSupported(NONE)
> .defaultValue("1")
> .build();
> static final PropertyDescriptor WRITE_ITERATIONS = new Builder()
> .name("Write Iterations")
> .displayName("Write Iterations")
> .description("Write Iterations")
> .required(true)
> .addValidator(POSITIVE_INTEGER_VALIDATOR)
> .expressionLanguageSupported(NONE)
> .defaultValue("2")
> .build();
> static final PropertyDescriptor READ_FIRST = new Builder()
> .name("Read First")
> .displayName("Read First")
> .description("Read First")
> .required(true)
> .allowableValues("true", "false")
> .expressionLanguageSupported(NONE)
> .defaultValue("false")
> .build();
> static final Relationship REL_SUCCESS = new Relationship.Builder()
> .name("success")
> .build();
> @Override
> public Set<Relationship> getRelationships() {
> return Collections.singleton(REL_SUCCESS);
> }
> @Override
> protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
> final List<PropertyDescriptor> properties = new ArrayList<>();
> properties.add(CLONE_ITERATIONS);
> properties.add(WRITE_ITERATIONS);
> properties.add(READ_FIRST);
> return properties;
> }
> @Override
> public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
> FlowFile original = session.get();
> if (original == null) {
> return;
> }
> try (final InputStream in = session.read(original)) {
> final long originalLength = countBytes(in);
> getLogger().info("Original FlowFile is " + originalLength + " bytes");
> } catch (final IOException e) {
> throw new ProcessException(e);
> }
> final int cloneIterations = context.getProperty(CLONE_ITERATIONS).asInteger();
> final int writeIterations = context.getProperty(WRITE_ITERATIONS).asInteger();
> final boolean readFirst = context.getProperty(READ_FIRST).asBoolean();
> for (int i=0; i < cloneIterations; i++) {
> FlowFile clone = session.clone(original);
> for (int w = 0; w < writeIterations; w++) {
> if (readFirst) {
> try (InputStream in = session.read(clone)) {
> final long len = countBytes(in);
> getLogger().info("Read " + len + " bytes");
> } catch (IOException e) {
> throw new ProcessException(e);
> }
> }
> clone = session.write(clone, out -> out.write("boom".getBytes()));
> clone = session.write(clone, StreamUtils::copy);
> }
> session.transfer(clone, REL_SUCCESS);
> }
> session.transfer(original, REL_SUCCESS);
> }
> private long countBytes(final InputStream in) throws IOException {
> int len = 0;
> while (in.read() >= 0) {
> len++;
> }
> return len;
> }
> }
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)