You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by jskora <gi...@git.apache.org> on 2016/05/20 17:16:30 UTC

[GitHub] nifi pull request: NIFI-1829 - Create new DebugFlow processor.

GitHub user jskora opened a pull request:

    https://github.com/apache/nifi/pull/458

    NIFI-1829 - Create new DebugFlow processor.

    * Create DebugFlow processor for use in testing and troubleshooting the processor framework.
    
    * 0.x - commits as is.
    * 1.x - in o.a.n.p.s.DebugFlow.java replace ProcessorLog references with ComponentLog.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jskora/nifi NIFI-1829

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/458.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #458
    
----
commit 3c7ac5b24a8d9ffd6188482fb50d89b43d6960d0
Author: Joe Skora <js...@apache.org>
Date:   2016-05-20T15:25:03Z

    NIFI-1829 - Create new DebugFlow processor.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #458: NIFI-1829 - Create new DebugFlow processor.

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/458#discussion_r67341980
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java ---
    @@ -0,0 +1,499 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.http.annotation.ThreadSafe;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.lang.reflect.InvocationTargetException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +@ThreadSafe()
    +@EventDriven()
    +@Tags({"test", "debug", "processor", "utility", "flow", "FlowFile"})
    +@CapabilityDescription("The DebugFlow processor aids testing and debugging the FlowFile framework by allowing various "
    +        + "responses to be explicitly triggered in response to the receipt of a FlowFile or a timer event without a "
    +        + "FlowFile if using timer or cron based scheduling.  It can force responses needed to exercise or test "
    +        + "various failure modes that can occur when a processor runs.\n"
    +        + "\n"
    +        + "When triggered, the processor loops through the appropriate response list (based on whether or not it "
    +        + "received a FlowFile).  A response is produced the configured number of times for each pass through its"
    +        + "response list, as long as the processor is running.\n"
    +        + "\n"
    +        + "Triggered by a FlowFile, the processor can produce the following responses."
    +        + "  1. transfer FlowFile to success relationship.\n"
    +        + "  2. transfer FlowFile to failure relationship.\n"
    +        + "  3. rollback the FlowFile without penalty.\n"
    +        + "  4. rollback the FlowFile and yield the context.\n"
    +        + "  5. rollback the FlowFile with penalty.\n"
    +        + "  6. throw an exception.\n"
    +        + "\n"
    +        + "Triggered without a FlowFile, the processor can produce the following responses."
    +        + "  1. do nothing and return.\n"
    +        + "  2. throw an exception.\n"
    +        + "  3. yield the context.\n")
    +public class DebugFlow extends AbstractProcessor {
    +
    +    private final AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("FlowFiles processed successfully.")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed to process.")
    +            .build();
    +
    +    private final AtomicReference<List<PropertyDescriptor>> propertyDescriptors = new AtomicReference<>();
    +
    +    static final PropertyDescriptor FF_SUCCESS_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Success Iterations")
    +            .description("Number of FlowFiles to forward to success relationship.")
    +            .required(true)
    +            .defaultValue("1")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_FAILURE_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Failure Iterations")
    +            .description("Number of FlowFiles to forward to failure relationship.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Rollback Iterations")
    +            .description("Number of FlowFiles to roll back (without penalty).")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_YIELD_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Rollback Yield Iterations")
    +            .description("Number of FlowFiles to roll back and yield.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_PENALTY_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Rollback Penalty Iterations")
    +            .description("Number of FlowFiles to roll back with penalty.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Exception Iterations")
    +            .description("Number of FlowFiles to throw exception.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_EXCEPTION_CLASS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Exception Class")
    +            .description("Exception class to be thrown (must extend java.lang.RuntimeException).")
    +            .required(true)
    +            .defaultValue("java.lang.RuntimeException")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .addValidator(new Validator() {
    +                @Override
    +                public ValidationResult validate(String subject, String input, ValidationContext context) {
    +                    Class<? extends RuntimeException> klass = classNameToRuntimeExceptionClass(input);
    +                    return new ValidationResult.Builder()
    +                            .subject(subject)
    +                            .input(input)
    +                            .valid(klass != null && (RuntimeException.class.isAssignableFrom(klass)))
    +                            .explanation(subject + " class must exist and extend java.lang.RuntimeException")
    +                            .build();
    +                }
    +            })
    +            .build();
    +
    +    static final PropertyDescriptor NO_FF_SKIP_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No FlowFile Skip Iterations")
    +            .description("Number of times to skip onTrigger if no FlowFile.")
    +            .required(true)
    +            .defaultValue("1")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No FlowFile Exception Iterations")
    +            .description("Number of times to throw NPE exception if no FlowFile.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_YIELD_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No FlowFile Yield Iterations")
    +            .description("Number of times to yield if no FlowFile.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_EXCEPTION_CLASS = new PropertyDescriptor.Builder()
    +            .name("No FlowFile Exception Class")
    +            .description("Exception class to be thrown if no FlowFile (must extend java.lang.RuntimeException).")
    +            .required(true)
    +            .defaultValue("java.lang.RuntimeException")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .addValidator(new Validator() {
    +                @Override
    +                public ValidationResult validate(String subject, String input, ValidationContext context) {
    +                    Class<? extends RuntimeException> klass = classNameToRuntimeExceptionClass(input);
    +                    return new ValidationResult.Builder()
    +                            .subject(subject)
    +                            .input(input)
    +                            .valid(klass != null && (RuntimeException.class.isAssignableFrom(klass)))
    +                            .explanation(subject + " class must exist and extend java.lang.RuntimeException")
    +                            .build();
    +                }
    +            })
    +            .build();
    +
    +
    +    private volatile Integer flowFileMaxSuccess = 0;
    +    private volatile Integer flowFileMaxFailure = 0;
    +    private volatile Integer flowFileMaxRollback = 0;
    +    private volatile Integer flowFileMaxYield = 0;
    +    private volatile Integer flowFileMaxPenalty = 0;
    +    private volatile Integer flowFileMaxException = 0;
    +
    +    private volatile Integer noFlowFileMaxSkip = 0;
    +    private volatile Integer noFlowFileMaxException = 0;
    +    private volatile Integer noFlowFileMaxYield = 0;
    +
    +    private volatile Integer flowFileCurrSuccess = 0;
    +    private volatile Integer flowFileCurrFailure = 0;
    +    private volatile Integer flowFileCurrRollback = 0;
    +    private volatile Integer flowFileCurrYield = 0;
    +    private volatile Integer flowFileCurrPenalty = 0;
    +    private volatile Integer flowFileCurrException = 0;
    +
    +    private volatile Integer noFlowFileCurrSkip = 0;
    +    private volatile Integer noFlowFileCurrException = 0;
    +    private volatile Integer noFlowFileCurrYield = 0;
    +
    +    private volatile Class<? extends RuntimeException> flowFileExceptionClass = null;
    +    private volatile Class<? extends RuntimeException> noFlowFileExceptionClass= null;
    +
    +    private final FlowFileResponse curr_ff_resp = new FlowFileResponse();
    +    private final NoFlowFileResponse curr_noff_resp = new NoFlowFileResponse();
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        synchronized (relationships) {
    +            if (relationships.get() == null) {
    +                HashSet<Relationship> relSet = new HashSet<>();
    +                relSet.add(REL_SUCCESS);
    +                relSet.add(REL_FAILURE);
    +                relationships.compareAndSet(null, Collections.unmodifiableSet(relSet));
    +            }
    +            return relationships.get();
    +        }
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        synchronized (propertyDescriptors) {
    +            if (propertyDescriptors.get() == null) {
    +                ArrayList<PropertyDescriptor> propList = new ArrayList<>();
    +                propList.add(FF_SUCCESS_ITERATIONS);
    +                propList.add(FF_FAILURE_ITERATIONS);
    +                propList.add(FF_ROLLBACK_ITERATIONS);
    +                propList.add(FF_ROLLBACK_YIELD_ITERATIONS);
    +                propList.add(FF_ROLLBACK_PENALTY_ITERATIONS);
    +                propList.add(FF_EXCEPTION_ITERATIONS);
    +                propList.add(FF_EXCEPTION_CLASS);
    +                propList.add(NO_FF_EXCEPTION_ITERATIONS);
    +                propList.add(NO_FF_YIELD_ITERATIONS);
    +                propList.add(NO_FF_SKIP_ITERATIONS);
    +                propList.add(NO_FF_EXCEPTION_CLASS);
    +                propertyDescriptors.compareAndSet(null, Collections.unmodifiableList(propList));
    +            }
    +            return propertyDescriptors.get();
    +        }
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        flowFileMaxSuccess = context.getProperty(FF_SUCCESS_ITERATIONS).asInteger();
    +        flowFileMaxFailure = context.getProperty(FF_FAILURE_ITERATIONS).asInteger();
    +        flowFileMaxYield = context.getProperty(FF_ROLLBACK_YIELD_ITERATIONS).asInteger();
    +        flowFileMaxRollback = context.getProperty(FF_ROLLBACK_ITERATIONS).asInteger();
    +        flowFileMaxPenalty = context.getProperty(FF_ROLLBACK_PENALTY_ITERATIONS).asInteger();
    +        flowFileMaxException = context.getProperty(FF_EXCEPTION_ITERATIONS).asInteger();
    +        noFlowFileMaxException = context.getProperty(NO_FF_EXCEPTION_ITERATIONS).asInteger();
    +        noFlowFileMaxYield = context.getProperty(NO_FF_YIELD_ITERATIONS).asInteger();
    +        noFlowFileMaxSkip = context.getProperty(NO_FF_SKIP_ITERATIONS).asInteger();
    +        curr_ff_resp.reset();
    +        curr_noff_resp.reset();
    +        flowFileExceptionClass = classNameToRuntimeExceptionClass(context.getProperty(FF_EXCEPTION_CLASS).toString());
    +        noFlowFileExceptionClass = classNameToRuntimeExceptionClass(context.getProperty(NO_FF_EXCEPTION_CLASS).toString());
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        final ComponentLog logger = getLogger();
    +
    +        FlowFile ff = session.get();
    +
    +        // Make up to 2 passes to allow rollover from last cycle to first.
    +        // (This could be "while(true)" since responses should break out  if selected, but this
    +        //  prevents endless loops in the event of unexpected errors or future changes.)
    +        int pass = 2;
    +        while (pass > 0) {
    +            pass -= 1;
    +            if (ff == null) {
    +                if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_SKIP_RESPONSE) {
    +                    if (noFlowFileCurrSkip < noFlowFileMaxSkip) {
    +                        noFlowFileCurrSkip += 1;
    +                        logger.info("DebugFlow skipping with no flow file");
    +                        return;
    +                    } else {
    +                        noFlowFileCurrSkip = 0;
    +                        curr_noff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_EXCEPTION_RESPONSE) {
    +                    if (noFlowFileCurrException < noFlowFileMaxException) {
    +                        noFlowFileCurrException += 1;
    +                        logger.info("DebugFlow throwing NPE with no flow file");
    +                        String message = "forced by " + this.getClass().getName();
    +                        RuntimeException rte;
    +                        try {
    +                            rte = noFlowFileExceptionClass.getConstructor(String.class).newInstance(message);
    +                            throw rte;
    +                        } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
    +                            e.printStackTrace();
    --- End diff --
    
    That is, I moved both occurrences.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #458: NIFI-1829 - Create new DebugFlow processor.

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/458
  
    @trkurc ok, in order to get 0.7.0 as soon as possible, would you mind if another committer took over reviewing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #458: NIFI-1829 - Create new DebugFlow processor.

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/458
  
    @jskora rebased and addressed comments, @trkurc are you able to finish reviewing this for 0.7.0?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #458: NIFI-1829 - Create new DebugFlow processor.

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/458#discussion_r65487334
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java ---
    @@ -0,0 +1,414 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@EventDriven()
    +@Tags({"test", "debug", "processor", "utility", "flow", "flowfile"})
    +@CapabilityDescription("This processor aids in the testing and debugging of the flowfile framework by allowing "
    +        + "a developer or flow manager to force various responses to a flowfile.  In response to a receiving a flowfile "
    +        + "it can route to the success or failure relationships, rollback, rollback and yield, rollback with penalty, "
    +        + "or throw an exception.  In addition, if using a timer based scheduling strategy, upon being triggered without "
    +        + "a flowfile, it can be configured to throw an exception,  when triggered without a flowfile.\n"
    +        + "\n"
    +        + "The 'iterations' properties, such as \"Success iterations\", configure how many times each response should occur "
    +        + "in succession before moving on to the next response within the group of flowfile responses or no flowfile"
    +        + "responses.\n"
    +        + "\n"
    +        + "The order of responses when a flow file is received are:"
    +        + "  1. transfer flowfile to success relationship.\n"
    +        + "  2. transfer flowfile to failure relationship.\n"
    +        + "  3. rollback the flowfile without penalty.\n"
    +        + "  4. rollback the flowfile and yield the context.\n"
    +        + "  5. rollback the flowfile with penalty.\n"
    +        + "  6. throw an exception.\n"
    +        + "\n"
    +        + "The order of responses when no flow file is received are:"
    +        + "  1. yield the context.\n"
    +        + "  2. throw an exception.\n"
    +        + "  3. do nothing and return.\n"
    +        + "\n"
    +        + "By default, the processor is configured to perform each response one time.  After processing the list of "
    +        + "responses it will resume from the top of the list.\n"
    +        + "\n"
    +        + "To suppress any response, it's value can be set to zero (0) and no responses of that type will occur during "
    +        + "processing.")
    +public class DebugFlow extends AbstractProcessor {
    +
    +    private Set<Relationship> relationships = null;
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("Flowfiles processed successfully.")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Flowfiles that failed to process.")
    +            .build();
    +
    +    private List<PropertyDescriptor> propertyDescriptors = null;
    +
    +    static final PropertyDescriptor FF_SUCCESS_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Success Iterations")
    +            .description("Number of flowfiles to forward to success relationship.")
    +            .required(true)
    +            .defaultValue("1")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_FAILURE_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Failure Iterations")
    +            .description("Number of flowfiles to forward to failure relationship.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Rollback Iterations")
    +            .description("Number of flowfiles to roll back (without penalty).")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_YIELD_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Rollback Yield Iterations")
    +            .description("Number of flowfiles to roll back and yield.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_PENALTY_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Rollback Penalty Iterations")
    +            .description("Number of flowfiles to roll back with penalty.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Exception Iterations")
    +            .description("Number of flowfiles to throw NPE exception.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor NO_FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No Flowfile Exception Iterations")
    +            .description("Number of times to throw NPE exception if no flowfile.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_YIELD_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No Flowfile Yield Iterations")
    +            .description("Number of times to yield if no flowfile.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_SKIP_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No Flowfile Skip Iterations")
    +            .description("Number of times to skip onTrigger if no flowfile.")
    +            .required(true)
    +            .defaultValue("1")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    Integer FF_SUCCESS_MAX = 0;
    +    private Integer FF_FAILURE_MAX = 0;
    +    private Integer FF_ROLLBACK_MAX = 0;
    +    private Integer FF_YIELD_MAX = 0;
    +    private Integer FF_PENALTY_MAX = 0;
    +    private Integer FF_EXCEPTION_MAX = 0;
    +
    +    private Integer NO_FF_EXCEPTION_MAX = 0;
    +    private Integer NO_FF_YIELD_MAX = 0;
    +    private Integer NO_FF_SKIP_MAX = 0;
    +
    +    private Integer FF_SUCCESS_CURR = 0;
    +    private Integer FF_FAILURE_CURR = 0;
    +    private Integer FF_ROLLBACK_CURR = 0;
    +    private Integer FF_YIELD_CURR = 0;
    +    private Integer FF_PENALTY_CURR = 0;
    +    private Integer FF_EXCEPTION_CURR = 0;
    +
    +    private Integer NO_FF_EXCEPTION_CURR = 0;
    +    private Integer NO_FF_YIELD_CURR = 0;
    +    private Integer NO_FF_SKIP_CURR = 0;
    +
    +    private FlowfileResponse curr_ff_resp;
    +    private NoFlowfileResponse curr_noff_resp;
    +
    +    private enum FlowfileResponse {
    +        FF_SUCCESS_RESPONSE(0, 1),
    +        FF_FAILURE_RESPONSE(1, 2),
    +        FF_ROLLBACK_RESPONSE(2, 3),
    +        FF_YIELD_RESPONSE(3, 4),
    +        FF_PENALTY_RESPONSE(4, 5),
    +        FF_EXCEPTION_RESPONSE(5, 0);
    +
    +        private Integer id;
    +        private Integer nextId;
    +        private FlowfileResponse next;
    +
    +        private static final Map<Integer, FlowfileResponse> byId = new HashMap<>();
    +        static {
    +            for (FlowfileResponse rc : FlowfileResponse.values()) {
    +                if (byId.put(rc.id, rc) != null) {
    +                    throw new IllegalArgumentException("duplicate id: " + rc.id);
    +                }
    +            }
    +            for (FlowfileResponse rc : FlowfileResponse.values()) {
    +                rc.next = byId.get(rc.nextId);
    +            }
    +        }
    +        FlowfileResponse(Integer pId, Integer pNext) {
    +            id = pId;
    +            nextId = pNext;
    +        }
    +        FlowfileResponse getNextCycle() {
    +            return next;
    +        }
    +    }
    +
    +    private enum NoFlowfileResponse {
    +        NO_FF_EXCEPTION_RESPONSE(0, 1),
    +        NO_FF_YIELD_RESPONSE(1, 2),
    +        NO_FF_SKIP_RESPONSE(2, 0);
    +
    +        private Integer id;
    +        private Integer nextId;
    +        private NoFlowfileResponse next;
    +
    +        private static final Map<Integer, NoFlowfileResponse> byId = new HashMap<>();
    +        static {
    +            for (NoFlowfileResponse rc : NoFlowfileResponse.values()) {
    +                if (byId.put(rc.id, rc) != null) {
    +                    throw new IllegalArgumentException("duplicate id: " + rc.id);
    +                }
    +            }
    +            for (NoFlowfileResponse rc : NoFlowfileResponse.values()) {
    +                rc.next = byId.get(rc.nextId);
    +            }
    +        }
    +        NoFlowfileResponse(Integer pId, Integer pNext) {
    +            id = pId;
    +            nextId = pNext;
    +        }
    +        NoFlowfileResponse getNextCycle() {
    +            return next;
    +        }
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        if (relationships == null) {
    +            HashSet<Relationship> relSet = new HashSet<>();
    +            relSet.add(REL_SUCCESS);
    +            relSet.add(REL_FAILURE);
    +            relationships = Collections.unmodifiableSet(relSet);
    +        }
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        if (propertyDescriptors == null) {
    +            ArrayList<PropertyDescriptor> propList = new ArrayList<>();
    +            propList.add(FF_SUCCESS_ITERATIONS);
    +            propList.add(FF_FAILURE_ITERATIONS);
    +            propList.add(FF_ROLLBACK_ITERATIONS);
    +            propList.add(FF_ROLLBACK_YIELD_ITERATIONS);
    +            propList.add(FF_ROLLBACK_PENALTY_ITERATIONS);
    +            propList.add(FF_EXCEPTION_ITERATIONS);
    +            propList.add(NO_FF_EXCEPTION_ITERATIONS);
    +            propList.add(NO_FF_YIELD_ITERATIONS);
    +            propList.add(NO_FF_SKIP_ITERATIONS);
    +            propertyDescriptors = Collections.unmodifiableList(propList);
    +        }
    +        return propertyDescriptors;
    +    }
    +
    +    @SuppressWarnings("unused")
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        FF_SUCCESS_MAX = context.getProperty(FF_SUCCESS_ITERATIONS).asInteger();
    +        FF_FAILURE_MAX = context.getProperty(FF_FAILURE_ITERATIONS).asInteger();
    +        FF_YIELD_MAX = context.getProperty(FF_ROLLBACK_YIELD_ITERATIONS).asInteger();
    +        FF_ROLLBACK_MAX = context.getProperty(FF_ROLLBACK_ITERATIONS).asInteger();
    +        FF_PENALTY_MAX = context.getProperty(FF_ROLLBACK_PENALTY_ITERATIONS).asInteger();
    +        FF_EXCEPTION_MAX = context.getProperty(FF_EXCEPTION_ITERATIONS).asInteger();
    +        NO_FF_EXCEPTION_MAX = context.getProperty(NO_FF_EXCEPTION_ITERATIONS).asInteger();
    +        NO_FF_YIELD_MAX = context.getProperty(NO_FF_YIELD_ITERATIONS).asInteger();
    +        NO_FF_SKIP_MAX = context.getProperty(NO_FF_SKIP_ITERATIONS).asInteger();
    +        curr_ff_resp = FlowfileResponse.FF_SUCCESS_RESPONSE;
    +        curr_noff_resp = NoFlowfileResponse.NO_FF_EXCEPTION_RESPONSE;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        final ProcessorLog logger = getLogger();
    +
    +        FlowFile ff = session.get();
    +
    +        // Make up to 2 passes to allow rollover from last cycle to first.
    +        // (This could be "while(true)" since responses should break out  if selected, but this
    +        //  prevents endless loops in the event of unexpected errors or future changes.)
    +        int pass = 2;
    +        while (pass > 0) {
    +            pass -= 1;
    +            if (ff == null) {
    +                if (curr_noff_resp == NoFlowfileResponse.NO_FF_EXCEPTION_RESPONSE) {
    +                    if (NO_FF_EXCEPTION_CURR < NO_FF_EXCEPTION_MAX) {
    +                        NO_FF_EXCEPTION_CURR += 1;
    +                        logger.info("DebugFlow throwing NPE with no flow file");
    +                        throw new NullPointerException("forced by " + this.getClass().getName());
    +                    } else {
    +                        NO_FF_EXCEPTION_CURR = 0;
    +                        curr_noff_resp = curr_noff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_noff_resp == NoFlowfileResponse.NO_FF_YIELD_RESPONSE) {
    +                    if (NO_FF_YIELD_CURR < NO_FF_YIELD_MAX) {
    +                        NO_FF_YIELD_CURR += 1;
    +                        logger.info("DebugFlow yielding with no flow file");
    +                        context.yield();
    +                        break;
    +                    } else {
    +                        NO_FF_YIELD_CURR = 0;
    +                        curr_noff_resp = curr_noff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_noff_resp == NoFlowfileResponse.NO_FF_SKIP_RESPONSE) {
    +                    if (NO_FF_SKIP_CURR < NO_FF_SKIP_MAX) {
    +                        NO_FF_SKIP_CURR += 1;
    +                        logger.info("DebugFlow skipping with no flow file");
    +                        return;
    +                    } else {
    +                        NO_FF_SKIP_CURR = 0;
    +                        curr_noff_resp = curr_noff_resp.getNextCycle();
    +                    }
    +                }
    +                return;
    +            } else {
    +                if (curr_ff_resp == FlowfileResponse.FF_SUCCESS_RESPONSE) {
    +                    if (FF_SUCCESS_CURR < FF_SUCCESS_MAX) {
    +                        FF_SUCCESS_CURR += 1;
    +                        logger.info("DebugFlow transferring to success file={} UUID={}",
    +                                new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
    +                                        ff.getAttribute(CoreAttributes.UUID.key())});
    +                        session.transfer(ff, REL_SUCCESS);
    +                        session.commit();
    +                        break;
    +                    } else {
    +                        FF_SUCCESS_CURR = 0;
    +                        curr_ff_resp = curr_ff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_ff_resp == FlowfileResponse.FF_FAILURE_RESPONSE) {
    +                    if (FF_FAILURE_CURR < FF_FAILURE_MAX) {
    +                        FF_FAILURE_CURR += 1;
    +                        logger.info("DebugFlow transferring to failure file={} UUID={}",
    +                                new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
    +                                        ff.getAttribute(CoreAttributes.UUID.key())});
    +                        session.transfer(ff, REL_FAILURE);
    +                        session.commit();
    +                        break;
    +                    } else {
    +                        FF_FAILURE_CURR = 0;
    +                        curr_ff_resp = curr_ff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_ff_resp == FlowfileResponse.FF_ROLLBACK_RESPONSE) {
    +                    if (FF_ROLLBACK_CURR < FF_ROLLBACK_MAX) {
    +                        FF_ROLLBACK_CURR += 1;
    +                        logger.info("DebugFlow rolling back (no penalty) file={} UUID={}",
    +                                new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
    +                                        ff.getAttribute(CoreAttributes.UUID.key())});
    +                        session.rollback();
    +                        session.commit();
    +                        break;
    +                    } else {
    +                        FF_ROLLBACK_CURR = 0;
    +                        curr_ff_resp = curr_ff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_ff_resp == FlowfileResponse.FF_YIELD_RESPONSE) {
    +                    if (FF_YIELD_CURR < FF_YIELD_MAX) {
    +                        FF_YIELD_CURR += 1;
    +                        logger.info("DebugFlow yielding file={} UUID={}",
    +                                new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
    +                                        ff.getAttribute(CoreAttributes.UUID.key())});
    +                        session.rollback();
    +                        context.yield();
    +                        return;
    +                    } else {
    +                        FF_YIELD_CURR = 0;
    +                        curr_ff_resp = curr_ff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_ff_resp == FlowfileResponse.FF_PENALTY_RESPONSE) {
    +                    if (FF_PENALTY_CURR < FF_PENALTY_MAX) {
    +                        FF_PENALTY_CURR += 1;
    +                        logger.info("DebugFlow rolling back (with penalty) file={} UUID={}",
    +                                new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
    +                                        ff.getAttribute(CoreAttributes.UUID.key())});
    +                        session.rollback(true);
    +                        session.commit();
    +                        break;
    +                    } else {
    +                        FF_PENALTY_CURR = 0;
    +                        curr_ff_resp = curr_ff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_ff_resp == FlowfileResponse.FF_EXCEPTION_RESPONSE) {
    +                    if (FF_EXCEPTION_CURR < FF_EXCEPTION_MAX) {
    +                        FF_EXCEPTION_CURR += 1;
    +                        logger.info("DebugFlow throwing NPE file={} UUID={}",
    +                                new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
    +                                        ff.getAttribute(CoreAttributes.UUID.key())});
    +                        throw new NullPointerException("forced by " + this.getClass().getName());
    --- End diff --
    
    I have broaden it out to any RuntimeException.  Other exceptions will be expected to be checked causing compilation errors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1829 - Create new DebugFlow processor.

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on the pull request:

    https://github.com/apache/nifi/pull/458
  
    reviewing this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #458: NIFI-1829 - Create new DebugFlow processor.

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on the issue:

    https://github.com/apache/nifi/pull/458
  
    I'm having trouble understanding the naming convention here. "DebugFlow" would indicate that it is debugging the flow that you are putting together, but the comment states "Create DebugFlow processor for use in testing and troubleshooting the processor framework." So it appears that the desire is to debug the framework?
    
    I also wonder if this is something that should be in the nifi codebase, as it really seems like a niche processor that is intended to test very specific things and not something that we would expect typical users to use. I may be wrong, but would GitHub be a more appropriate place for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1829 - Create new DebugFlow processor.

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/458#discussion_r65231483
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java ---
    @@ -0,0 +1,414 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@EventDriven()
    +@Tags({"test", "debug", "processor", "utility", "flow", "flowfile"})
    +@CapabilityDescription("This processor aids in the testing and debugging of the flowfile framework by allowing "
    +        + "a developer or flow manager to force various responses to a flowfile.  In response to a receiving a flowfile "
    +        + "it can route to the success or failure relationships, rollback, rollback and yield, rollback with penalty, "
    +        + "or throw an exception.  In addition, if using a timer based scheduling strategy, upon being triggered without "
    +        + "a flowfile, it can be configured to throw an exception,  when triggered without a flowfile.\n"
    +        + "\n"
    +        + "The 'iterations' properties, such as \"Success iterations\", configure how many times each response should occur "
    +        + "in succession before moving on to the next response within the group of flowfile responses or no flowfile"
    +        + "responses.\n"
    +        + "\n"
    +        + "The order of responses when a flow file is received are:"
    +        + "  1. transfer flowfile to success relationship.\n"
    +        + "  2. transfer flowfile to failure relationship.\n"
    +        + "  3. rollback the flowfile without penalty.\n"
    +        + "  4. rollback the flowfile and yield the context.\n"
    +        + "  5. rollback the flowfile with penalty.\n"
    +        + "  6. throw an exception.\n"
    +        + "\n"
    +        + "The order of responses when no flow file is received are:"
    +        + "  1. yield the context.\n"
    +        + "  2. throw an exception.\n"
    +        + "  3. do nothing and return.\n"
    +        + "\n"
    +        + "By default, the processor is configured to perform each response one time.  After processing the list of "
    +        + "responses it will resume from the top of the list.\n"
    --- End diff --
    
    looking at the code below, it looks like most are configured for 0 times.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #458: NIFI-1829 - Create new DebugFlow processor.

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on the issue:

    https://github.com/apache/nifi/pull/458
  
    Not until this weekend
    On Jun 15, 2016 4:30 PM, "Joe Percivall" <no...@github.com> wrote:
    
    > @jskora <https://github.com/jskora> rebased and addressed comments,
    > @trkurc <https://github.com/trkurc> are you able to finish reviewing this
    > for 0.7.0?
    >
    > \u2014
    > You are receiving this because you were mentioned.
    > Reply to this email directly, view it on GitHub
    > <https://github.com/apache/nifi/pull/458#issuecomment-226205185>, or mute
    > the thread
    > <https://github.com/notifications/unsubscribe/AC6MklhVNpoG5GKrpPEsn5Yk0ixSetZ6ks5qMAydgaJpZM4IjZYv>
    > .
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #458: NIFI-1829 - Create new DebugFlow processor.

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/458
  
    The name isn't great but not sure what would be better.  It is indeed a debugger for the framework and less so for a flow.  It is perhaps good to call it 'CreateChaos'.  That said i personally think this is a great idea to have in the build and part of the standard processors.  It would only be used by power users or curious types.  And for everyone else it is a noop.  But it could make it easier for us to demonstrate the evil effects of constant exceptions in a flow and do interesting regression checks on various infrastructures.  There is admittedly probably a better place for this but with the upcoming nar registry work we can tackle that then.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #458: NIFI-1829 - Create new DebugFlow processor.

Posted by mosermw <gi...@git.apache.org>.
Github user mosermw commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/458#discussion_r67230370
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java ---
    @@ -0,0 +1,499 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.http.annotation.ThreadSafe;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.lang.reflect.InvocationTargetException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +@ThreadSafe()
    +@EventDriven()
    +@Tags({"test", "debug", "processor", "utility", "flow", "FlowFile"})
    +@CapabilityDescription("The DebugFlow processor aids testing and debugging the FlowFile framework by allowing various "
    +        + "responses to be explicitly triggered in response to the receipt of a FlowFile or a timer event without a "
    +        + "FlowFile if using timer or cron based scheduling.  It can force responses needed to exercise or test "
    +        + "various failure modes that can occur when a processor runs.\n"
    +        + "\n"
    +        + "When triggered, the processor loops through the appropriate response list (based on whether or not it "
    +        + "received a FlowFile).  A response is produced the configured number of times for each pass through its"
    +        + "response list, as long as the processor is running.\n"
    +        + "\n"
    +        + "Triggered by a FlowFile, the processor can produce the following responses."
    +        + "  1. transfer FlowFile to success relationship.\n"
    +        + "  2. transfer FlowFile to failure relationship.\n"
    +        + "  3. rollback the FlowFile without penalty.\n"
    +        + "  4. rollback the FlowFile and yield the context.\n"
    +        + "  5. rollback the FlowFile with penalty.\n"
    +        + "  6. throw an exception.\n"
    +        + "\n"
    +        + "Triggered without a FlowFile, the processor can produce the following responses."
    +        + "  1. do nothing and return.\n"
    +        + "  2. throw an exception.\n"
    +        + "  3. yield the context.\n")
    +public class DebugFlow extends AbstractProcessor {
    +
    +    private final AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("FlowFiles processed successfully.")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed to process.")
    +            .build();
    +
    +    private final AtomicReference<List<PropertyDescriptor>> propertyDescriptors = new AtomicReference<>();
    +
    +    static final PropertyDescriptor FF_SUCCESS_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Success Iterations")
    +            .description("Number of FlowFiles to forward to success relationship.")
    +            .required(true)
    +            .defaultValue("1")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_FAILURE_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Failure Iterations")
    +            .description("Number of FlowFiles to forward to failure relationship.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Rollback Iterations")
    +            .description("Number of FlowFiles to roll back (without penalty).")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_YIELD_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Rollback Yield Iterations")
    +            .description("Number of FlowFiles to roll back and yield.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_PENALTY_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Rollback Penalty Iterations")
    +            .description("Number of FlowFiles to roll back with penalty.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Exception Iterations")
    +            .description("Number of FlowFiles to throw exception.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_EXCEPTION_CLASS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Exception Class")
    +            .description("Exception class to be thrown (must extend java.lang.RuntimeException).")
    +            .required(true)
    +            .defaultValue("java.lang.RuntimeException")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .addValidator(new Validator() {
    +                @Override
    +                public ValidationResult validate(String subject, String input, ValidationContext context) {
    +                    Class<? extends RuntimeException> klass = classNameToRuntimeExceptionClass(input);
    +                    return new ValidationResult.Builder()
    +                            .subject(subject)
    +                            .input(input)
    +                            .valid(klass != null && (RuntimeException.class.isAssignableFrom(klass)))
    +                            .explanation(subject + " class must exist and extend java.lang.RuntimeException")
    +                            .build();
    +                }
    +            })
    +            .build();
    +
    +    static final PropertyDescriptor NO_FF_SKIP_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No FlowFile Skip Iterations")
    +            .description("Number of times to skip onTrigger if no FlowFile.")
    +            .required(true)
    +            .defaultValue("1")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No FlowFile Exception Iterations")
    +            .description("Number of times to throw NPE exception if no FlowFile.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_YIELD_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No FlowFile Yield Iterations")
    +            .description("Number of times to yield if no FlowFile.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_EXCEPTION_CLASS = new PropertyDescriptor.Builder()
    +            .name("No FlowFile Exception Class")
    +            .description("Exception class to be thrown if no FlowFile (must extend java.lang.RuntimeException).")
    +            .required(true)
    +            .defaultValue("java.lang.RuntimeException")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .addValidator(new Validator() {
    +                @Override
    +                public ValidationResult validate(String subject, String input, ValidationContext context) {
    +                    Class<? extends RuntimeException> klass = classNameToRuntimeExceptionClass(input);
    +                    return new ValidationResult.Builder()
    +                            .subject(subject)
    +                            .input(input)
    +                            .valid(klass != null && (RuntimeException.class.isAssignableFrom(klass)))
    +                            .explanation(subject + " class must exist and extend java.lang.RuntimeException")
    +                            .build();
    +                }
    +            })
    +            .build();
    +
    +
    +    private volatile Integer flowFileMaxSuccess = 0;
    +    private volatile Integer flowFileMaxFailure = 0;
    +    private volatile Integer flowFileMaxRollback = 0;
    +    private volatile Integer flowFileMaxYield = 0;
    +    private volatile Integer flowFileMaxPenalty = 0;
    +    private volatile Integer flowFileMaxException = 0;
    +
    +    private volatile Integer noFlowFileMaxSkip = 0;
    +    private volatile Integer noFlowFileMaxException = 0;
    +    private volatile Integer noFlowFileMaxYield = 0;
    +
    +    private volatile Integer flowFileCurrSuccess = 0;
    +    private volatile Integer flowFileCurrFailure = 0;
    +    private volatile Integer flowFileCurrRollback = 0;
    +    private volatile Integer flowFileCurrYield = 0;
    +    private volatile Integer flowFileCurrPenalty = 0;
    +    private volatile Integer flowFileCurrException = 0;
    +
    +    private volatile Integer noFlowFileCurrSkip = 0;
    +    private volatile Integer noFlowFileCurrException = 0;
    +    private volatile Integer noFlowFileCurrYield = 0;
    +
    +    private volatile Class<? extends RuntimeException> flowFileExceptionClass = null;
    +    private volatile Class<? extends RuntimeException> noFlowFileExceptionClass= null;
    +
    +    private final FlowFileResponse curr_ff_resp = new FlowFileResponse();
    +    private final NoFlowFileResponse curr_noff_resp = new NoFlowFileResponse();
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        synchronized (relationships) {
    +            if (relationships.get() == null) {
    +                HashSet<Relationship> relSet = new HashSet<>();
    +                relSet.add(REL_SUCCESS);
    +                relSet.add(REL_FAILURE);
    +                relationships.compareAndSet(null, Collections.unmodifiableSet(relSet));
    +            }
    +            return relationships.get();
    +        }
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        synchronized (propertyDescriptors) {
    +            if (propertyDescriptors.get() == null) {
    +                ArrayList<PropertyDescriptor> propList = new ArrayList<>();
    +                propList.add(FF_SUCCESS_ITERATIONS);
    +                propList.add(FF_FAILURE_ITERATIONS);
    +                propList.add(FF_ROLLBACK_ITERATIONS);
    +                propList.add(FF_ROLLBACK_YIELD_ITERATIONS);
    +                propList.add(FF_ROLLBACK_PENALTY_ITERATIONS);
    +                propList.add(FF_EXCEPTION_ITERATIONS);
    +                propList.add(FF_EXCEPTION_CLASS);
    +                propList.add(NO_FF_EXCEPTION_ITERATIONS);
    +                propList.add(NO_FF_YIELD_ITERATIONS);
    +                propList.add(NO_FF_SKIP_ITERATIONS);
    +                propList.add(NO_FF_EXCEPTION_CLASS);
    +                propertyDescriptors.compareAndSet(null, Collections.unmodifiableList(propList));
    +            }
    +            return propertyDescriptors.get();
    +        }
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        flowFileMaxSuccess = context.getProperty(FF_SUCCESS_ITERATIONS).asInteger();
    +        flowFileMaxFailure = context.getProperty(FF_FAILURE_ITERATIONS).asInteger();
    +        flowFileMaxYield = context.getProperty(FF_ROLLBACK_YIELD_ITERATIONS).asInteger();
    +        flowFileMaxRollback = context.getProperty(FF_ROLLBACK_ITERATIONS).asInteger();
    +        flowFileMaxPenalty = context.getProperty(FF_ROLLBACK_PENALTY_ITERATIONS).asInteger();
    +        flowFileMaxException = context.getProperty(FF_EXCEPTION_ITERATIONS).asInteger();
    +        noFlowFileMaxException = context.getProperty(NO_FF_EXCEPTION_ITERATIONS).asInteger();
    +        noFlowFileMaxYield = context.getProperty(NO_FF_YIELD_ITERATIONS).asInteger();
    +        noFlowFileMaxSkip = context.getProperty(NO_FF_SKIP_ITERATIONS).asInteger();
    +        curr_ff_resp.reset();
    +        curr_noff_resp.reset();
    +        flowFileExceptionClass = classNameToRuntimeExceptionClass(context.getProperty(FF_EXCEPTION_CLASS).toString());
    +        noFlowFileExceptionClass = classNameToRuntimeExceptionClass(context.getProperty(NO_FF_EXCEPTION_CLASS).toString());
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        final ComponentLog logger = getLogger();
    +
    +        FlowFile ff = session.get();
    +
    +        // Make up to 2 passes to allow rollover from last cycle to first.
    +        // (This could be "while(true)" since responses should break out  if selected, but this
    +        //  prevents endless loops in the event of unexpected errors or future changes.)
    +        int pass = 2;
    +        while (pass > 0) {
    +            pass -= 1;
    +            if (ff == null) {
    +                if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_SKIP_RESPONSE) {
    +                    if (noFlowFileCurrSkip < noFlowFileMaxSkip) {
    +                        noFlowFileCurrSkip += 1;
    +                        logger.info("DebugFlow skipping with no flow file");
    +                        return;
    +                    } else {
    +                        noFlowFileCurrSkip = 0;
    +                        curr_noff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_EXCEPTION_RESPONSE) {
    +                    if (noFlowFileCurrException < noFlowFileMaxException) {
    +                        noFlowFileCurrException += 1;
    +                        logger.info("DebugFlow throwing NPE with no flow file");
    +                        String message = "forced by " + this.getClass().getName();
    +                        RuntimeException rte;
    +                        try {
    +                            rte = noFlowFileExceptionClass.getConstructor(String.class).newInstance(message);
    +                            throw rte;
    +                        } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
    +                            e.printStackTrace();
    --- End diff --
    
    Eeek e.printStackTrace() should be changed to use logger.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1829 - Create new DebugFlow processor.

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/458#discussion_r65231261
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java ---
    @@ -0,0 +1,414 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@EventDriven()
    +@Tags({"test", "debug", "processor", "utility", "flow", "flowfile"})
    +@CapabilityDescription("This processor aids in the testing and debugging of the flowfile framework by allowing "
    +        + "a developer or flow manager to force various responses to a flowfile.  In response to a receiving a flowfile "
    +        + "it can route to the success or failure relationships, rollback, rollback and yield, rollback with penalty, "
    +        + "or throw an exception.  In addition, if using a timer based scheduling strategy, upon being triggered without "
    +        + "a flowfile, it can be configured to throw an exception,  when triggered without a flowfile.\n"
    +        + "\n"
    +        + "The 'iterations' properties, such as \"Success iterations\", configure how many times each response should occur "
    +        + "in succession before moving on to the next response within the group of flowfile responses or no flowfile"
    +        + "responses.\n"
    +        + "\n"
    +        + "The order of responses when a flow file is received are:"
    +        + "  1. transfer flowfile to success relationship.\n"
    +        + "  2. transfer flowfile to failure relationship.\n"
    +        + "  3. rollback the flowfile without penalty.\n"
    +        + "  4. rollback the flowfile and yield the context.\n"
    +        + "  5. rollback the flowfile with penalty.\n"
    +        + "  6. throw an exception.\n"
    +        + "\n"
    +        + "The order of responses when no flow file is received are:"
    +        + "  1. yield the context.\n"
    +        + "  2. throw an exception.\n"
    +        + "  3. do nothing and return.\n"
    +        + "\n"
    +        + "By default, the processor is configured to perform each response one time.  After processing the list of "
    +        + "responses it will resume from the top of the list.\n"
    +        + "\n"
    +        + "To suppress any response, it's value can be set to zero (0) and no responses of that type will occur during "
    +        + "processing.")
    +public class DebugFlow extends AbstractProcessor {
    +
    +    private Set<Relationship> relationships = null;
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("Flowfiles processed successfully.")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Flowfiles that failed to process.")
    +            .build();
    +
    +    private List<PropertyDescriptor> propertyDescriptors = null;
    +
    +    static final PropertyDescriptor FF_SUCCESS_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Success Iterations")
    +            .description("Number of flowfiles to forward to success relationship.")
    +            .required(true)
    +            .defaultValue("1")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_FAILURE_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Failure Iterations")
    +            .description("Number of flowfiles to forward to failure relationship.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Rollback Iterations")
    +            .description("Number of flowfiles to roll back (without penalty).")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_YIELD_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Rollback Yield Iterations")
    +            .description("Number of flowfiles to roll back and yield.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_PENALTY_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Rollback Penalty Iterations")
    +            .description("Number of flowfiles to roll back with penalty.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Exception Iterations")
    +            .description("Number of flowfiles to throw NPE exception.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor NO_FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No Flowfile Exception Iterations")
    +            .description("Number of times to throw NPE exception if no flowfile.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_YIELD_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No Flowfile Yield Iterations")
    +            .description("Number of times to yield if no flowfile.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_SKIP_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No Flowfile Skip Iterations")
    +            .description("Number of times to skip onTrigger if no flowfile.")
    +            .required(true)
    +            .defaultValue("1")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    Integer FF_SUCCESS_MAX = 0;
    +    private Integer FF_FAILURE_MAX = 0;
    +    private Integer FF_ROLLBACK_MAX = 0;
    +    private Integer FF_YIELD_MAX = 0;
    +    private Integer FF_PENALTY_MAX = 0;
    +    private Integer FF_EXCEPTION_MAX = 0;
    +
    +    private Integer NO_FF_EXCEPTION_MAX = 0;
    +    private Integer NO_FF_YIELD_MAX = 0;
    +    private Integer NO_FF_SKIP_MAX = 0;
    +
    +    private Integer FF_SUCCESS_CURR = 0;
    +    private Integer FF_FAILURE_CURR = 0;
    +    private Integer FF_ROLLBACK_CURR = 0;
    +    private Integer FF_YIELD_CURR = 0;
    +    private Integer FF_PENALTY_CURR = 0;
    +    private Integer FF_EXCEPTION_CURR = 0;
    +
    +    private Integer NO_FF_EXCEPTION_CURR = 0;
    +    private Integer NO_FF_YIELD_CURR = 0;
    +    private Integer NO_FF_SKIP_CURR = 0;
    +
    +    private FlowfileResponse curr_ff_resp;
    +    private NoFlowfileResponse curr_noff_resp;
    +
    +    private enum FlowfileResponse {
    +        FF_SUCCESS_RESPONSE(0, 1),
    +        FF_FAILURE_RESPONSE(1, 2),
    +        FF_ROLLBACK_RESPONSE(2, 3),
    +        FF_YIELD_RESPONSE(3, 4),
    +        FF_PENALTY_RESPONSE(4, 5),
    +        FF_EXCEPTION_RESPONSE(5, 0);
    +
    +        private Integer id;
    +        private Integer nextId;
    +        private FlowfileResponse next;
    +
    +        private static final Map<Integer, FlowfileResponse> byId = new HashMap<>();
    +        static {
    +            for (FlowfileResponse rc : FlowfileResponse.values()) {
    +                if (byId.put(rc.id, rc) != null) {
    +                    throw new IllegalArgumentException("duplicate id: " + rc.id);
    +                }
    +            }
    +            for (FlowfileResponse rc : FlowfileResponse.values()) {
    +                rc.next = byId.get(rc.nextId);
    +            }
    +        }
    +        FlowfileResponse(Integer pId, Integer pNext) {
    +            id = pId;
    +            nextId = pNext;
    +        }
    +        FlowfileResponse getNextCycle() {
    +            return next;
    +        }
    +    }
    +
    +    private enum NoFlowfileResponse {
    +        NO_FF_EXCEPTION_RESPONSE(0, 1),
    +        NO_FF_YIELD_RESPONSE(1, 2),
    +        NO_FF_SKIP_RESPONSE(2, 0);
    +
    +        private Integer id;
    +        private Integer nextId;
    +        private NoFlowfileResponse next;
    +
    +        private static final Map<Integer, NoFlowfileResponse> byId = new HashMap<>();
    +        static {
    +            for (NoFlowfileResponse rc : NoFlowfileResponse.values()) {
    +                if (byId.put(rc.id, rc) != null) {
    +                    throw new IllegalArgumentException("duplicate id: " + rc.id);
    +                }
    +            }
    +            for (NoFlowfileResponse rc : NoFlowfileResponse.values()) {
    +                rc.next = byId.get(rc.nextId);
    +            }
    +        }
    +        NoFlowfileResponse(Integer pId, Integer pNext) {
    +            id = pId;
    +            nextId = pNext;
    +        }
    +        NoFlowfileResponse getNextCycle() {
    +            return next;
    +        }
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        if (relationships == null) {
    +            HashSet<Relationship> relSet = new HashSet<>();
    +            relSet.add(REL_SUCCESS);
    +            relSet.add(REL_FAILURE);
    +            relationships = Collections.unmodifiableSet(relSet);
    +        }
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        if (propertyDescriptors == null) {
    +            ArrayList<PropertyDescriptor> propList = new ArrayList<>();
    +            propList.add(FF_SUCCESS_ITERATIONS);
    +            propList.add(FF_FAILURE_ITERATIONS);
    +            propList.add(FF_ROLLBACK_ITERATIONS);
    +            propList.add(FF_ROLLBACK_YIELD_ITERATIONS);
    +            propList.add(FF_ROLLBACK_PENALTY_ITERATIONS);
    +            propList.add(FF_EXCEPTION_ITERATIONS);
    +            propList.add(NO_FF_EXCEPTION_ITERATIONS);
    +            propList.add(NO_FF_YIELD_ITERATIONS);
    +            propList.add(NO_FF_SKIP_ITERATIONS);
    +            propertyDescriptors = Collections.unmodifiableList(propList);
    +        }
    +        return propertyDescriptors;
    +    }
    +
    +    @SuppressWarnings("unused")
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        FF_SUCCESS_MAX = context.getProperty(FF_SUCCESS_ITERATIONS).asInteger();
    +        FF_FAILURE_MAX = context.getProperty(FF_FAILURE_ITERATIONS).asInteger();
    +        FF_YIELD_MAX = context.getProperty(FF_ROLLBACK_YIELD_ITERATIONS).asInteger();
    +        FF_ROLLBACK_MAX = context.getProperty(FF_ROLLBACK_ITERATIONS).asInteger();
    +        FF_PENALTY_MAX = context.getProperty(FF_ROLLBACK_PENALTY_ITERATIONS).asInteger();
    +        FF_EXCEPTION_MAX = context.getProperty(FF_EXCEPTION_ITERATIONS).asInteger();
    +        NO_FF_EXCEPTION_MAX = context.getProperty(NO_FF_EXCEPTION_ITERATIONS).asInteger();
    +        NO_FF_YIELD_MAX = context.getProperty(NO_FF_YIELD_ITERATIONS).asInteger();
    +        NO_FF_SKIP_MAX = context.getProperty(NO_FF_SKIP_ITERATIONS).asInteger();
    +        curr_ff_resp = FlowfileResponse.FF_SUCCESS_RESPONSE;
    +        curr_noff_resp = NoFlowfileResponse.NO_FF_EXCEPTION_RESPONSE;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        final ProcessorLog logger = getLogger();
    +
    +        FlowFile ff = session.get();
    +
    +        // Make up to 2 passes to allow rollover from last cycle to first.
    +        // (This could be "while(true)" since responses should break out  if selected, but this
    +        //  prevents endless loops in the event of unexpected errors or future changes.)
    +        int pass = 2;
    +        while (pass > 0) {
    +            pass -= 1;
    +            if (ff == null) {
    +                if (curr_noff_resp == NoFlowfileResponse.NO_FF_EXCEPTION_RESPONSE) {
    +                    if (NO_FF_EXCEPTION_CURR < NO_FF_EXCEPTION_MAX) {
    +                        NO_FF_EXCEPTION_CURR += 1;
    +                        logger.info("DebugFlow throwing NPE with no flow file");
    +                        throw new NullPointerException("forced by " + this.getClass().getName());
    +                    } else {
    +                        NO_FF_EXCEPTION_CURR = 0;
    +                        curr_noff_resp = curr_noff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_noff_resp == NoFlowfileResponse.NO_FF_YIELD_RESPONSE) {
    +                    if (NO_FF_YIELD_CURR < NO_FF_YIELD_MAX) {
    +                        NO_FF_YIELD_CURR += 1;
    +                        logger.info("DebugFlow yielding with no flow file");
    +                        context.yield();
    +                        break;
    +                    } else {
    +                        NO_FF_YIELD_CURR = 0;
    +                        curr_noff_resp = curr_noff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_noff_resp == NoFlowfileResponse.NO_FF_SKIP_RESPONSE) {
    +                    if (NO_FF_SKIP_CURR < NO_FF_SKIP_MAX) {
    +                        NO_FF_SKIP_CURR += 1;
    +                        logger.info("DebugFlow skipping with no flow file");
    +                        return;
    +                    } else {
    +                        NO_FF_SKIP_CURR = 0;
    +                        curr_noff_resp = curr_noff_resp.getNextCycle();
    +                    }
    +                }
    +                return;
    +            } else {
    +                if (curr_ff_resp == FlowfileResponse.FF_SUCCESS_RESPONSE) {
    +                    if (FF_SUCCESS_CURR < FF_SUCCESS_MAX) {
    +                        FF_SUCCESS_CURR += 1;
    +                        logger.info("DebugFlow transferring to success file={} UUID={}",
    +                                new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
    +                                        ff.getAttribute(CoreAttributes.UUID.key())});
    +                        session.transfer(ff, REL_SUCCESS);
    +                        session.commit();
    +                        break;
    +                    } else {
    +                        FF_SUCCESS_CURR = 0;
    +                        curr_ff_resp = curr_ff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_ff_resp == FlowfileResponse.FF_FAILURE_RESPONSE) {
    +                    if (FF_FAILURE_CURR < FF_FAILURE_MAX) {
    +                        FF_FAILURE_CURR += 1;
    +                        logger.info("DebugFlow transferring to failure file={} UUID={}",
    +                                new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
    +                                        ff.getAttribute(CoreAttributes.UUID.key())});
    +                        session.transfer(ff, REL_FAILURE);
    +                        session.commit();
    +                        break;
    +                    } else {
    +                        FF_FAILURE_CURR = 0;
    +                        curr_ff_resp = curr_ff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_ff_resp == FlowfileResponse.FF_ROLLBACK_RESPONSE) {
    +                    if (FF_ROLLBACK_CURR < FF_ROLLBACK_MAX) {
    +                        FF_ROLLBACK_CURR += 1;
    +                        logger.info("DebugFlow rolling back (no penalty) file={} UUID={}",
    +                                new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
    +                                        ff.getAttribute(CoreAttributes.UUID.key())});
    +                        session.rollback();
    +                        session.commit();
    +                        break;
    +                    } else {
    +                        FF_ROLLBACK_CURR = 0;
    +                        curr_ff_resp = curr_ff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_ff_resp == FlowfileResponse.FF_YIELD_RESPONSE) {
    +                    if (FF_YIELD_CURR < FF_YIELD_MAX) {
    +                        FF_YIELD_CURR += 1;
    +                        logger.info("DebugFlow yielding file={} UUID={}",
    +                                new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
    +                                        ff.getAttribute(CoreAttributes.UUID.key())});
    +                        session.rollback();
    +                        context.yield();
    +                        return;
    +                    } else {
    +                        FF_YIELD_CURR = 0;
    +                        curr_ff_resp = curr_ff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_ff_resp == FlowfileResponse.FF_PENALTY_RESPONSE) {
    +                    if (FF_PENALTY_CURR < FF_PENALTY_MAX) {
    +                        FF_PENALTY_CURR += 1;
    +                        logger.info("DebugFlow rolling back (with penalty) file={} UUID={}",
    +                                new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
    +                                        ff.getAttribute(CoreAttributes.UUID.key())});
    +                        session.rollback(true);
    +                        session.commit();
    +                        break;
    +                    } else {
    +                        FF_PENALTY_CURR = 0;
    +                        curr_ff_resp = curr_ff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_ff_resp == FlowfileResponse.FF_EXCEPTION_RESPONSE) {
    +                    if (FF_EXCEPTION_CURR < FF_EXCEPTION_MAX) {
    +                        FF_EXCEPTION_CURR += 1;
    +                        logger.info("DebugFlow throwing NPE file={} UUID={}",
    +                                new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
    +                                        ff.getAttribute(CoreAttributes.UUID.key())});
    +                        throw new NullPointerException("forced by " + this.getClass().getName());
    --- End diff --
    
    Why an NPE here? Would being allowed to specify the class of a Throwable in the configuration to throw here be overkill?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #458: NIFI-1829 - Create new DebugFlow processor.

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/458


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #458: NIFI-1829 - Create new DebugFlow processor.

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/458#discussion_r67335539
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java ---
    @@ -0,0 +1,499 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.http.annotation.ThreadSafe;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.lang.reflect.InvocationTargetException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +@ThreadSafe()
    +@EventDriven()
    +@Tags({"test", "debug", "processor", "utility", "flow", "FlowFile"})
    +@CapabilityDescription("The DebugFlow processor aids testing and debugging the FlowFile framework by allowing various "
    +        + "responses to be explicitly triggered in response to the receipt of a FlowFile or a timer event without a "
    +        + "FlowFile if using timer or cron based scheduling.  It can force responses needed to exercise or test "
    +        + "various failure modes that can occur when a processor runs.\n"
    +        + "\n"
    +        + "When triggered, the processor loops through the appropriate response list (based on whether or not it "
    +        + "received a FlowFile).  A response is produced the configured number of times for each pass through its"
    +        + "response list, as long as the processor is running.\n"
    +        + "\n"
    +        + "Triggered by a FlowFile, the processor can produce the following responses."
    +        + "  1. transfer FlowFile to success relationship.\n"
    +        + "  2. transfer FlowFile to failure relationship.\n"
    +        + "  3. rollback the FlowFile without penalty.\n"
    +        + "  4. rollback the FlowFile and yield the context.\n"
    +        + "  5. rollback the FlowFile with penalty.\n"
    +        + "  6. throw an exception.\n"
    +        + "\n"
    +        + "Triggered without a FlowFile, the processor can produce the following responses."
    +        + "  1. do nothing and return.\n"
    +        + "  2. throw an exception.\n"
    +        + "  3. yield the context.\n")
    +public class DebugFlow extends AbstractProcessor {
    +
    +    private final AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("FlowFiles processed successfully.")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed to process.")
    +            .build();
    +
    +    private final AtomicReference<List<PropertyDescriptor>> propertyDescriptors = new AtomicReference<>();
    +
    +    static final PropertyDescriptor FF_SUCCESS_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Success Iterations")
    +            .description("Number of FlowFiles to forward to success relationship.")
    +            .required(true)
    +            .defaultValue("1")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_FAILURE_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Failure Iterations")
    +            .description("Number of FlowFiles to forward to failure relationship.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Rollback Iterations")
    +            .description("Number of FlowFiles to roll back (without penalty).")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_YIELD_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Rollback Yield Iterations")
    +            .description("Number of FlowFiles to roll back and yield.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_PENALTY_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Rollback Penalty Iterations")
    +            .description("Number of FlowFiles to roll back with penalty.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Exception Iterations")
    +            .description("Number of FlowFiles to throw exception.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_EXCEPTION_CLASS = new PropertyDescriptor.Builder()
    +            .name("FlowFile Exception Class")
    +            .description("Exception class to be thrown (must extend java.lang.RuntimeException).")
    +            .required(true)
    +            .defaultValue("java.lang.RuntimeException")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .addValidator(new Validator() {
    +                @Override
    +                public ValidationResult validate(String subject, String input, ValidationContext context) {
    +                    Class<? extends RuntimeException> klass = classNameToRuntimeExceptionClass(input);
    +                    return new ValidationResult.Builder()
    +                            .subject(subject)
    +                            .input(input)
    +                            .valid(klass != null && (RuntimeException.class.isAssignableFrom(klass)))
    +                            .explanation(subject + " class must exist and extend java.lang.RuntimeException")
    +                            .build();
    +                }
    +            })
    +            .build();
    +
    +    static final PropertyDescriptor NO_FF_SKIP_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No FlowFile Skip Iterations")
    +            .description("Number of times to skip onTrigger if no FlowFile.")
    +            .required(true)
    +            .defaultValue("1")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No FlowFile Exception Iterations")
    +            .description("Number of times to throw NPE exception if no FlowFile.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_YIELD_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No FlowFile Yield Iterations")
    +            .description("Number of times to yield if no FlowFile.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_EXCEPTION_CLASS = new PropertyDescriptor.Builder()
    +            .name("No FlowFile Exception Class")
    +            .description("Exception class to be thrown if no FlowFile (must extend java.lang.RuntimeException).")
    +            .required(true)
    +            .defaultValue("java.lang.RuntimeException")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .addValidator(new Validator() {
    +                @Override
    +                public ValidationResult validate(String subject, String input, ValidationContext context) {
    +                    Class<? extends RuntimeException> klass = classNameToRuntimeExceptionClass(input);
    +                    return new ValidationResult.Builder()
    +                            .subject(subject)
    +                            .input(input)
    +                            .valid(klass != null && (RuntimeException.class.isAssignableFrom(klass)))
    +                            .explanation(subject + " class must exist and extend java.lang.RuntimeException")
    +                            .build();
    +                }
    +            })
    +            .build();
    +
    +
    +    private volatile Integer flowFileMaxSuccess = 0;
    +    private volatile Integer flowFileMaxFailure = 0;
    +    private volatile Integer flowFileMaxRollback = 0;
    +    private volatile Integer flowFileMaxYield = 0;
    +    private volatile Integer flowFileMaxPenalty = 0;
    +    private volatile Integer flowFileMaxException = 0;
    +
    +    private volatile Integer noFlowFileMaxSkip = 0;
    +    private volatile Integer noFlowFileMaxException = 0;
    +    private volatile Integer noFlowFileMaxYield = 0;
    +
    +    private volatile Integer flowFileCurrSuccess = 0;
    +    private volatile Integer flowFileCurrFailure = 0;
    +    private volatile Integer flowFileCurrRollback = 0;
    +    private volatile Integer flowFileCurrYield = 0;
    +    private volatile Integer flowFileCurrPenalty = 0;
    +    private volatile Integer flowFileCurrException = 0;
    +
    +    private volatile Integer noFlowFileCurrSkip = 0;
    +    private volatile Integer noFlowFileCurrException = 0;
    +    private volatile Integer noFlowFileCurrYield = 0;
    +
    +    private volatile Class<? extends RuntimeException> flowFileExceptionClass = null;
    +    private volatile Class<? extends RuntimeException> noFlowFileExceptionClass= null;
    +
    +    private final FlowFileResponse curr_ff_resp = new FlowFileResponse();
    +    private final NoFlowFileResponse curr_noff_resp = new NoFlowFileResponse();
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        synchronized (relationships) {
    +            if (relationships.get() == null) {
    +                HashSet<Relationship> relSet = new HashSet<>();
    +                relSet.add(REL_SUCCESS);
    +                relSet.add(REL_FAILURE);
    +                relationships.compareAndSet(null, Collections.unmodifiableSet(relSet));
    +            }
    +            return relationships.get();
    +        }
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        synchronized (propertyDescriptors) {
    +            if (propertyDescriptors.get() == null) {
    +                ArrayList<PropertyDescriptor> propList = new ArrayList<>();
    +                propList.add(FF_SUCCESS_ITERATIONS);
    +                propList.add(FF_FAILURE_ITERATIONS);
    +                propList.add(FF_ROLLBACK_ITERATIONS);
    +                propList.add(FF_ROLLBACK_YIELD_ITERATIONS);
    +                propList.add(FF_ROLLBACK_PENALTY_ITERATIONS);
    +                propList.add(FF_EXCEPTION_ITERATIONS);
    +                propList.add(FF_EXCEPTION_CLASS);
    +                propList.add(NO_FF_EXCEPTION_ITERATIONS);
    +                propList.add(NO_FF_YIELD_ITERATIONS);
    +                propList.add(NO_FF_SKIP_ITERATIONS);
    +                propList.add(NO_FF_EXCEPTION_CLASS);
    +                propertyDescriptors.compareAndSet(null, Collections.unmodifiableList(propList));
    +            }
    +            return propertyDescriptors.get();
    +        }
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        flowFileMaxSuccess = context.getProperty(FF_SUCCESS_ITERATIONS).asInteger();
    +        flowFileMaxFailure = context.getProperty(FF_FAILURE_ITERATIONS).asInteger();
    +        flowFileMaxYield = context.getProperty(FF_ROLLBACK_YIELD_ITERATIONS).asInteger();
    +        flowFileMaxRollback = context.getProperty(FF_ROLLBACK_ITERATIONS).asInteger();
    +        flowFileMaxPenalty = context.getProperty(FF_ROLLBACK_PENALTY_ITERATIONS).asInteger();
    +        flowFileMaxException = context.getProperty(FF_EXCEPTION_ITERATIONS).asInteger();
    +        noFlowFileMaxException = context.getProperty(NO_FF_EXCEPTION_ITERATIONS).asInteger();
    +        noFlowFileMaxYield = context.getProperty(NO_FF_YIELD_ITERATIONS).asInteger();
    +        noFlowFileMaxSkip = context.getProperty(NO_FF_SKIP_ITERATIONS).asInteger();
    +        curr_ff_resp.reset();
    +        curr_noff_resp.reset();
    +        flowFileExceptionClass = classNameToRuntimeExceptionClass(context.getProperty(FF_EXCEPTION_CLASS).toString());
    +        noFlowFileExceptionClass = classNameToRuntimeExceptionClass(context.getProperty(NO_FF_EXCEPTION_CLASS).toString());
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        final ComponentLog logger = getLogger();
    +
    +        FlowFile ff = session.get();
    +
    +        // Make up to 2 passes to allow rollover from last cycle to first.
    +        // (This could be "while(true)" since responses should break out  if selected, but this
    +        //  prevents endless loops in the event of unexpected errors or future changes.)
    +        int pass = 2;
    +        while (pass > 0) {
    +            pass -= 1;
    +            if (ff == null) {
    +                if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_SKIP_RESPONSE) {
    +                    if (noFlowFileCurrSkip < noFlowFileMaxSkip) {
    +                        noFlowFileCurrSkip += 1;
    +                        logger.info("DebugFlow skipping with no flow file");
    +                        return;
    +                    } else {
    +                        noFlowFileCurrSkip = 0;
    +                        curr_noff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_EXCEPTION_RESPONSE) {
    +                    if (noFlowFileCurrException < noFlowFileMaxException) {
    +                        noFlowFileCurrException += 1;
    +                        logger.info("DebugFlow throwing NPE with no flow file");
    +                        String message = "forced by " + this.getClass().getName();
    +                        RuntimeException rte;
    +                        try {
    +                            rte = noFlowFileExceptionClass.getConstructor(String.class).newInstance(message);
    +                            throw rte;
    +                        } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
    +                            e.printStackTrace();
    --- End diff --
    
    Validation will have verified that the class exists and can be loaded by the processor, so this should (haha) never occur.  I moved it to a debug level log message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1829 - Create new DebugFlow processor.

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/458#discussion_r65230520
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java ---
    @@ -0,0 +1,414 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@EventDriven()
    +@Tags({"test", "debug", "processor", "utility", "flow", "flowfile"})
    +@CapabilityDescription("This processor aids in the testing and debugging of the flowfile framework by allowing "
    +        + "a developer or flow manager to force various responses to a flowfile.  In response to a receiving a flowfile "
    +        + "it can route to the success or failure relationships, rollback, rollback and yield, rollback with penalty, "
    +        + "or throw an exception.  In addition, if using a timer based scheduling strategy, upon being triggered without "
    +        + "a flowfile, it can be configured to throw an exception,  when triggered without a flowfile.\n"
    --- End diff --
    
    I think the last "when triggered ..." is redundant


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1829 - Create new DebugFlow processor.

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/458#discussion_r65230431
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java ---
    @@ -0,0 +1,414 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@EventDriven()
    +@Tags({"test", "debug", "processor", "utility", "flow", "flowfile"})
    +@CapabilityDescription("This processor aids in the testing and debugging of the flowfile framework by allowing "
    --- End diff --
    
    recommend s/flowfile/FlowFile/ for consistency


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1829 - Create new DebugFlow processor.

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/458#discussion_r65234934
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java ---
    @@ -0,0 +1,414 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@EventDriven()
    +@Tags({"test", "debug", "processor", "utility", "flow", "flowfile"})
    +@CapabilityDescription("This processor aids in the testing and debugging of the flowfile framework by allowing "
    +        + "a developer or flow manager to force various responses to a flowfile.  In response to a receiving a flowfile "
    +        + "it can route to the success or failure relationships, rollback, rollback and yield, rollback with penalty, "
    +        + "or throw an exception.  In addition, if using a timer based scheduling strategy, upon being triggered without "
    +        + "a flowfile, it can be configured to throw an exception,  when triggered without a flowfile.\n"
    +        + "\n"
    +        + "The 'iterations' properties, such as \"Success iterations\", configure how many times each response should occur "
    +        + "in succession before moving on to the next response within the group of flowfile responses or no flowfile"
    +        + "responses.\n"
    +        + "\n"
    +        + "The order of responses when a flow file is received are:"
    +        + "  1. transfer flowfile to success relationship.\n"
    +        + "  2. transfer flowfile to failure relationship.\n"
    +        + "  3. rollback the flowfile without penalty.\n"
    +        + "  4. rollback the flowfile and yield the context.\n"
    +        + "  5. rollback the flowfile with penalty.\n"
    +        + "  6. throw an exception.\n"
    +        + "\n"
    +        + "The order of responses when no flow file is received are:"
    +        + "  1. yield the context.\n"
    +        + "  2. throw an exception.\n"
    +        + "  3. do nothing and return.\n"
    +        + "\n"
    +        + "By default, the processor is configured to perform each response one time.  After processing the list of "
    +        + "responses it will resume from the top of the list.\n"
    +        + "\n"
    +        + "To suppress any response, it's value can be set to zero (0) and no responses of that type will occur during "
    +        + "processing.")
    +public class DebugFlow extends AbstractProcessor {
    +
    +    private Set<Relationship> relationships = null;
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("Flowfiles processed successfully.")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Flowfiles that failed to process.")
    +            .build();
    +
    +    private List<PropertyDescriptor> propertyDescriptors = null;
    +
    +    static final PropertyDescriptor FF_SUCCESS_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Success Iterations")
    +            .description("Number of flowfiles to forward to success relationship.")
    +            .required(true)
    +            .defaultValue("1")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_FAILURE_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Failure Iterations")
    +            .description("Number of flowfiles to forward to failure relationship.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Rollback Iterations")
    +            .description("Number of flowfiles to roll back (without penalty).")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_YIELD_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Rollback Yield Iterations")
    +            .description("Number of flowfiles to roll back and yield.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_PENALTY_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Rollback Penalty Iterations")
    +            .description("Number of flowfiles to roll back with penalty.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Exception Iterations")
    +            .description("Number of flowfiles to throw NPE exception.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor NO_FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No Flowfile Exception Iterations")
    +            .description("Number of times to throw NPE exception if no flowfile.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_YIELD_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No Flowfile Yield Iterations")
    +            .description("Number of times to yield if no flowfile.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_SKIP_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No Flowfile Skip Iterations")
    +            .description("Number of times to skip onTrigger if no flowfile.")
    +            .required(true)
    +            .defaultValue("1")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    Integer FF_SUCCESS_MAX = 0;
    +    private Integer FF_FAILURE_MAX = 0;
    +    private Integer FF_ROLLBACK_MAX = 0;
    +    private Integer FF_YIELD_MAX = 0;
    +    private Integer FF_PENALTY_MAX = 0;
    +    private Integer FF_EXCEPTION_MAX = 0;
    +
    +    private Integer NO_FF_EXCEPTION_MAX = 0;
    +    private Integer NO_FF_YIELD_MAX = 0;
    +    private Integer NO_FF_SKIP_MAX = 0;
    +
    +    private Integer FF_SUCCESS_CURR = 0;
    --- End diff --
    
    as these are not constants, they shouldn't be all caps. also, I believe there will be concurrency issues with this (need locking and safe publication)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #458: NIFI-1829 - Create new DebugFlow processor.

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/458#discussion_r65486064
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java ---
    @@ -0,0 +1,414 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@EventDriven()
    +@Tags({"test", "debug", "processor", "utility", "flow", "flowfile"})
    +@CapabilityDescription("This processor aids in the testing and debugging of the flowfile framework by allowing "
    +        + "a developer or flow manager to force various responses to a flowfile.  In response to a receiving a flowfile "
    +        + "it can route to the success or failure relationships, rollback, rollback and yield, rollback with penalty, "
    +        + "or throw an exception.  In addition, if using a timer based scheduling strategy, upon being triggered without "
    +        + "a flowfile, it can be configured to throw an exception,  when triggered without a flowfile.\n"
    --- End diff --
    
    Agreed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #458: NIFI-1829 - Create new DebugFlow processor.

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/458#discussion_r65487473
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java ---
    @@ -0,0 +1,414 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@EventDriven()
    +@Tags({"test", "debug", "processor", "utility", "flow", "flowfile"})
    +@CapabilityDescription("This processor aids in the testing and debugging of the flowfile framework by allowing "
    +        + "a developer or flow manager to force various responses to a flowfile.  In response to a receiving a flowfile "
    +        + "it can route to the success or failure relationships, rollback, rollback and yield, rollback with penalty, "
    +        + "or throw an exception.  In addition, if using a timer based scheduling strategy, upon being triggered without "
    +        + "a flowfile, it can be configured to throw an exception,  when triggered without a flowfile.\n"
    +        + "\n"
    +        + "The 'iterations' properties, such as \"Success iterations\", configure how many times each response should occur "
    +        + "in succession before moving on to the next response within the group of flowfile responses or no flowfile"
    +        + "responses.\n"
    +        + "\n"
    +        + "The order of responses when a flow file is received are:"
    +        + "  1. transfer flowfile to success relationship.\n"
    +        + "  2. transfer flowfile to failure relationship.\n"
    +        + "  3. rollback the flowfile without penalty.\n"
    +        + "  4. rollback the flowfile and yield the context.\n"
    +        + "  5. rollback the flowfile with penalty.\n"
    +        + "  6. throw an exception.\n"
    +        + "\n"
    +        + "The order of responses when no flow file is received are:"
    +        + "  1. yield the context.\n"
    +        + "  2. throw an exception.\n"
    +        + "  3. do nothing and return.\n"
    +        + "\n"
    +        + "By default, the processor is configured to perform each response one time.  After processing the list of "
    +        + "responses it will resume from the top of the list.\n"
    +        + "\n"
    +        + "To suppress any response, it's value can be set to zero (0) and no responses of that type will occur during "
    +        + "processing.")
    +public class DebugFlow extends AbstractProcessor {
    +
    +    private Set<Relationship> relationships = null;
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("Flowfiles processed successfully.")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Flowfiles that failed to process.")
    +            .build();
    +
    +    private List<PropertyDescriptor> propertyDescriptors = null;
    +
    +    static final PropertyDescriptor FF_SUCCESS_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Success Iterations")
    +            .description("Number of flowfiles to forward to success relationship.")
    +            .required(true)
    +            .defaultValue("1")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_FAILURE_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Failure Iterations")
    +            .description("Number of flowfiles to forward to failure relationship.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Rollback Iterations")
    +            .description("Number of flowfiles to roll back (without penalty).")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_YIELD_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Rollback Yield Iterations")
    +            .description("Number of flowfiles to roll back and yield.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_PENALTY_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Rollback Penalty Iterations")
    +            .description("Number of flowfiles to roll back with penalty.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("Exception Iterations")
    +            .description("Number of flowfiles to throw NPE exception.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor NO_FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No Flowfile Exception Iterations")
    +            .description("Number of times to throw NPE exception if no flowfile.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_YIELD_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No Flowfile Yield Iterations")
    +            .description("Number of times to yield if no flowfile.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_SKIP_ITERATIONS = new PropertyDescriptor.Builder()
    +            .name("No Flowfile Skip Iterations")
    +            .description("Number of times to skip onTrigger if no flowfile.")
    +            .required(true)
    +            .defaultValue("1")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    Integer FF_SUCCESS_MAX = 0;
    +    private Integer FF_FAILURE_MAX = 0;
    +    private Integer FF_ROLLBACK_MAX = 0;
    +    private Integer FF_YIELD_MAX = 0;
    +    private Integer FF_PENALTY_MAX = 0;
    +    private Integer FF_EXCEPTION_MAX = 0;
    +
    +    private Integer NO_FF_EXCEPTION_MAX = 0;
    +    private Integer NO_FF_YIELD_MAX = 0;
    +    private Integer NO_FF_SKIP_MAX = 0;
    +
    +    private Integer FF_SUCCESS_CURR = 0;
    --- End diff --
    
    I will fix the names and looking into cleaning up the concurrency problems.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #458: NIFI-1829 - Create new DebugFlow processor.

Posted by mosermw <gi...@git.apache.org>.
Github user mosermw commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/458#discussion_r67230167
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java ---
    @@ -0,0 +1,499 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.http.annotation.ThreadSafe;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.lang.reflect.InvocationTargetException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +@ThreadSafe()
    +@EventDriven()
    +@Tags({"test", "debug", "processor", "utility", "flow", "FlowFile"})
    +@CapabilityDescription("The DebugFlow processor aids testing and debugging the FlowFile framework by allowing various "
    +        + "responses to be explicitly triggered in response to the receipt of a FlowFile or a timer event without a "
    +        + "FlowFile if using timer or cron based scheduling.  It can force responses needed to exercise or test "
    +        + "various failure modes that can occur when a processor runs.\n"
    +        + "\n"
    +        + "When triggered, the processor loops through the appropriate response list (based on whether or not it "
    +        + "received a FlowFile).  A response is produced the configured number of times for each pass through its"
    +        + "response list, as long as the processor is running.\n"
    +        + "\n"
    +        + "Triggered by a FlowFile, the processor can produce the following responses."
    +        + "  1. transfer FlowFile to success relationship.\n"
    +        + "  2. transfer FlowFile to failure relationship.\n"
    +        + "  3. rollback the FlowFile without penalty.\n"
    +        + "  4. rollback the FlowFile and yield the context.\n"
    +        + "  5. rollback the FlowFile with penalty.\n"
    +        + "  6. throw an exception.\n"
    +        + "\n"
    +        + "Triggered without a FlowFile, the processor can produce the following responses."
    +        + "  1. do nothing and return.\n"
    +        + "  2. throw an exception.\n"
    +        + "  3. yield the context.\n")
    --- End diff --
    
    Formatting with \n doesn't work inside a CapabilityDescription annotation.  It just gets smashed together.  If you are OK with that @jskora then we can leave it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #458: NIFI-1829 - Create new DebugFlow processor.

Posted by mosermw <gi...@git.apache.org>.
Github user mosermw commented on the issue:

    https://github.com/apache/nifi/pull/458
  
    Completed review, latest changes look good, unit testing and integration testing worked as expected, code passes -Pcontrib-check.
    
    +1 I will squash commits and merge to 0.x and master shortly.  Thanks @jskora.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #458: NIFI-1829 - Create new DebugFlow processor.

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on the issue:

    https://github.com/apache/nifi/pull/458
  
    I don't mind even a little! Thanks @mosermw


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #458: NIFI-1829 - Create new DebugFlow processor.

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/458#discussion_r67335284
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java ---
    @@ -0,0 +1,499 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.http.annotation.ThreadSafe;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.lang.reflect.InvocationTargetException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +@ThreadSafe()
    +@EventDriven()
    +@Tags({"test", "debug", "processor", "utility", "flow", "FlowFile"})
    +@CapabilityDescription("The DebugFlow processor aids testing and debugging the FlowFile framework by allowing various "
    +        + "responses to be explicitly triggered in response to the receipt of a FlowFile or a timer event without a "
    +        + "FlowFile if using timer or cron based scheduling.  It can force responses needed to exercise or test "
    +        + "various failure modes that can occur when a processor runs.\n"
    +        + "\n"
    +        + "When triggered, the processor loops through the appropriate response list (based on whether or not it "
    +        + "received a FlowFile).  A response is produced the configured number of times for each pass through its"
    +        + "response list, as long as the processor is running.\n"
    +        + "\n"
    +        + "Triggered by a FlowFile, the processor can produce the following responses."
    +        + "  1. transfer FlowFile to success relationship.\n"
    +        + "  2. transfer FlowFile to failure relationship.\n"
    +        + "  3. rollback the FlowFile without penalty.\n"
    +        + "  4. rollback the FlowFile and yield the context.\n"
    +        + "  5. rollback the FlowFile with penalty.\n"
    +        + "  6. throw an exception.\n"
    +        + "\n"
    +        + "Triggered without a FlowFile, the processor can produce the following responses."
    +        + "  1. do nothing and return.\n"
    +        + "  2. throw an exception.\n"
    +        + "  3. yield the context.\n")
    --- End diff --
    
    Split out to additional details page.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #458: NIFI-1829 - Create new DebugFlow processor.

Posted by mosermw <gi...@git.apache.org>.
Github user mosermw commented on the issue:

    https://github.com/apache/nifi/pull/458
  
    I will review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #458: NIFI-1829 - Create new DebugFlow processor.

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on the issue:

    https://github.com/apache/nifi/pull/458
  
    It looks like the issues I had were addressed, but I won't have a chance to dive in deeper until Saturday, maybe


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #458: NIFI-1829 - Create new DebugFlow processor.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/458#discussion_r67230976
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java ---
    @@ -0,0 +1,499 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.http.annotation.ThreadSafe;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.lang.reflect.InvocationTargetException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +@ThreadSafe()
    +@EventDriven()
    +@Tags({"test", "debug", "processor", "utility", "flow", "FlowFile"})
    +@CapabilityDescription("The DebugFlow processor aids testing and debugging the FlowFile framework by allowing various "
    +        + "responses to be explicitly triggered in response to the receipt of a FlowFile or a timer event without a "
    +        + "FlowFile if using timer or cron based scheduling.  It can force responses needed to exercise or test "
    +        + "various failure modes that can occur when a processor runs.\n"
    +        + "\n"
    +        + "When triggered, the processor loops through the appropriate response list (based on whether or not it "
    +        + "received a FlowFile).  A response is produced the configured number of times for each pass through its"
    +        + "response list, as long as the processor is running.\n"
    +        + "\n"
    +        + "Triggered by a FlowFile, the processor can produce the following responses."
    +        + "  1. transfer FlowFile to success relationship.\n"
    +        + "  2. transfer FlowFile to failure relationship.\n"
    +        + "  3. rollback the FlowFile without penalty.\n"
    +        + "  4. rollback the FlowFile and yield the context.\n"
    +        + "  5. rollback the FlowFile with penalty.\n"
    +        + "  6. throw an exception.\n"
    +        + "\n"
    +        + "Triggered without a FlowFile, the processor can produce the following responses."
    +        + "  1. do nothing and return.\n"
    +        + "  2. throw an exception.\n"
    +        + "  3. yield the context.\n")
    --- End diff --
    
    I wonder if there's an abbreviated version of the doc for capability description, and move this out to an additional details page 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #458: NIFI-1829 - Create new DebugFlow processor.

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on the issue:

    https://github.com/apache/nifi/pull/458
  
    Ok, I think that last commit covers the review feedback.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---