You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Mark Payne (JIRA)" <ji...@apache.org> on 2018/12/06 21:21:00 UTC
[jira] [Created] (NIFI-5879) ContentNotFoundException thrown if a
FlowFile's content claim is read, then written to, then read again, within
the same ProcessSession
Mark Payne created NIFI-5879:
--------------------------------
Summary: ContentNotFoundException thrown if a FlowFile's content claim is read, then written to, then read again, within the same ProcessSession
Key: NIFI-5879
URL: https://issues.apache.org/jira/browse/NIFI-5879
Project: Apache NiFi
Issue Type: Bug
Components: Core Framework
Reporter: Mark Payne
Assignee: Mark Payne
Fix For: 1.9.0
The following Processor can be used to replicate the issue.
If a processor reads content, then attempts to write to the content, then read what was just written, a ContentNotFoundException will be thrown.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static org.apache.nifi.processor.util.StandardValidators.POSITIVE_INTEGER_VALIDATOR;
public class ReplicateWeirdness extends AbstractProcessor {
static final PropertyDescriptor CLONE_ITERATIONS = new Builder()
.name("Iterations")
.displayName("Iterations")
.description("Number of Iterations")
.required(true)
.addValidator(POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(NONE)
.defaultValue("1")
.build();
static final PropertyDescriptor WRITE_ITERATIONS = new Builder()
.name("Write Iterations")
.displayName("Write Iterations")
.description("Write Iterations")
.required(true)
.addValidator(POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(NONE)
.defaultValue("2")
.build();
static final PropertyDescriptor READ_FIRST = new Builder()
.name("Read First")
.displayName("Read First")
.description("Read First")
.required(true)
.allowableValues("true", "false")
.expressionLanguageSupported(NONE)
.defaultValue("false")
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.build();
@Override
public Set<Relationship> getRelationships() {
return Collections.singleton(REL_SUCCESS);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(CLONE_ITERATIONS);
properties.add(WRITE_ITERATIONS);
properties.add(READ_FIRST);
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile original = session.get();
if (original == null) {
return;
}
try (final InputStream in = session.read(original)) {
final long originalLength = countBytes(in);
getLogger().info("Original FlowFile is " + originalLength + " bytes");
} catch (final IOException e) {
throw new ProcessException(e);
}
final int cloneIterations = context.getProperty(CLONE_ITERATIONS).asInteger();
final int writeIterations = context.getProperty(WRITE_ITERATIONS).asInteger();
final boolean readFirst = context.getProperty(READ_FIRST).asBoolean();
for (int i=0; i < cloneIterations; i++) {
FlowFile clone = session.clone(original);
for (int w = 0; w < writeIterations; w++) {
if (readFirst) {
try (InputStream in = session.read(clone)) {
final long len = countBytes(in);
getLogger().info("Read " + len + " bytes");
} catch (IOException e) {
throw new ProcessException(e);
}
}
clone = session.write(clone, out -> out.write("boom".getBytes()));
clone = session.write(clone, StreamUtils::copy);
}
session.transfer(clone, REL_SUCCESS);
}
session.transfer(original, REL_SUCCESS);
}
private long countBytes(final InputStream in) throws IOException {
int len = 0;
while (in.read() >= 0) {
len++;
}
return len;
}
}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)