You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by joey <gi...@git.apache.org> on 2015/05/05 13:08:32 UTC

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

GitHub user joey opened a pull request:

    https://github.com/apache/incubator-nifi/pull/51

    NIFI-589: Add processors that can run Apache Flume sources/sinks

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/joey/incubator-nifi NIFI-589-flume-processors

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-nifi/pull/51.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #51
    
----
commit 5bb93142c16e412de3062c10d5f84da33bdcee37
Author: Joey Echeverria <jo...@cloudera.com>
Date:   2015-01-29T00:30:49Z

    Added processors that can run Flume sources and Flume sinks.

commit bbf03d2419d62b01ca6474e7a9ce86caafc7f1c7
Author: Joey Echeverria <jo...@cloudera.com>
Date:   2015-01-30T19:21:21Z

    Added the sink's relationships to the relationship set.
    Added error checkign and logging for sink/source creation.
    Fixed an issue with transaction managemetn in the sink.
    Reformatted per coding standard.

commit f5c6ffad78d6e48274d9a3df63b20ed7ce2585a4
Author: Joey Echeverria <jo...@gmail.com>
Date:   2015-04-08T00:18:45Z

    Fix poms, versions, add batching to sink processor
    
    * Fix pom issues caused by the rebase.
    * Update the Flume bundle's version to 0.1.0
    * Add support for batching to the sink processor

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-116190999
  
    Should we move ahead to Flume 1.6.0 now?  Appears to still compile/test/run fine with that.
    
    nifi/pom.xml  Formatting issue with derby dependency.  Tabs gone wild.  Fix dependency ref to 0.2.0
    
    nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml  Fix dependency ref to 0.2.0
    
    Two easy check style violations to resolve:
    
    WARNING] src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java[24:8] (imports) UnusedImports: Unused import - org.apache.flume.Context.
    [WARNING] src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java[30] (imports) AvoidStarImport: Using the '.*' form of import should be avoided - org.apache.nifi.processors.flume.util.FlowFileEventConstants.*.
    
    Need to establish a LICENSE and NOTICE within the nifi-flume-nar project so that it will end up that the bar binary will have proper accounting for bundled items and need to roll up any unique dependencies into the nifi-assembly LICENSE and NOTICE.  I reviewed all bundled dependencies and they all appear to be legit so doing this will be tedious but not problematic.  I am happy to do this for you.  There are lots of good examples of how to do this in the other nar bundles and we’ve augmented ASF guidance for handling licensing here; http://nifi.incubator.apache.org/licensing-guide.html
    
    The various Flume processors within the Nar appear to have a significant version conflict for Netty.  Ranging from wanting 3.5.x to 3.8.x of Netty.  Unclear if this is important so just bringing to your attention.
    
    The bundled dependencies include a rather old version of Jetty.  I am not clear if this will conflict with the Jetty libraries we use and a quick scan of our docs suggests we don’t point out the limitations of Nar class loader isolation (will make a ticket).  You may have no action here just bringing it to your attention.
    
    Recommend renaming the processors to ‘ExecuteFlumeSource’ and ‘ExecuteFlumeSink’.  The naming structure we try to follow is generally verb-subject.  I did not find that we had this documented (will make a ticket).
    
    This processor is a perfect case for providing some ‘additional details’ documentation so that when someone looks up the usage it will show them how to configure a fully working example.  http://nifi.incubator.apache.org/docs/nifi-docs/html/developer-guide.html#advanced-documentation
    
    Is there any service provider construct or similar mechanism whereby we could provide a dropdown of identified sources and sinks so the user doesn’t have to type them by hand?  Not critical just curious.  Given that the other property is pasting of a flume properties section this is already pretty ‘developer heavy’ anyway so it isn’t necessary.  But if possible it could be a nice convenience to offer.
    
    FlumeSinkProcessor: I think because channel.setSession(…) is being called as it is during the onTrigger method this means the processor needs to run serially.  If multiple threads are running then the channel’s session would get overwritten potentially creating some very interesting threading issues most likely.  Is marking it is as serial only going to be sufficient?  We have an annotation for that if so which will have the framework ensure it only gets a single thread for the processor.  I think a similar issue is present on FlumeSourceProc.
    
    If you happen to have a template of a flow that can be used to test/demonstrate its function that would be a great thing to have.  If so please add to the JIRA.
    
    Looking forward to seeing this merged in.  Let me know if you’d like to address these or discuss them further.
    
    Thanks
    Joe



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-120766917
  
    If there are specific sources/sinks we want to expose as processors that support threading, we could probably do that by adding unique processors for them that wraps the generic version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r31856990
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import java.util.List;
    +import java.util.Set;
    +import org.apache.flume.Context;
    +import org.apache.flume.Event;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.EventDrivenSource;
    +import org.apache.flume.PollableSource;
    +import org.apache.flume.Source;
    +import org.apache.flume.SourceRunner;
    +import org.apache.flume.Transaction;
    +import org.apache.flume.channel.ChannelProcessor;
    +import org.apache.flume.channel.MemoryChannel;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.flume.source.EventDrivenSourceRunner;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +/**
    + * This processor runs a Flume source
    + */
    +@Tags({"flume", "hadoop", "get", "source"})
    +@CapabilityDescription("Generate FlowFile data from a Flume source")
    +public class FlumeSourceProcessor extends AbstractFlumeProcessor {
    +
    +    private Source source;
    +    private SourceRunner runner;
    +    private MemoryChannel channel;
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
    +            .name("Source Type")
    +            .description("The fully-qualified name of the Source class")
    +            .required(true)
    +            .addValidator(createSourceValidator())
    +            .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +            .name("Agent Name")
    +            .description("The name of the agent used in the Flume source configuration")
    +            .required(true)
    +            .defaultValue("tier1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +            .name("Source Name")
    +            .description("The name of the source used in the Flume source configuration")
    +            .required(true)
    +            .defaultValue("src-1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +            .name("Flume Configuration")
    +            .description("The Flume configuration for the source copied from the flume.properties file")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
    +        this.relationships = ImmutableSet.of(SUCCESS);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        try {
    +            source = SOURCE_FACTORY.create(
    +                    context.getProperty(SOURCE_NAME).getValue(),
    +                    context.getProperty(SOURCE_TYPE).getValue());
    +
    +            String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
    +            String agentName = context.getProperty(AGENT_NAME).getValue();
    +            String sourceName = context.getProperty(SOURCE_NAME).getValue();
    +            Configurables.configure(source,
    +                    getFlumeSourceContext(flumeConfig, agentName, sourceName));
    +
    +            if (source instanceof EventDrivenSource) {
    +                runner = new EventDrivenSourceRunner();
    +                channel = new MemoryChannel();
    +                Configurables.configure(channel, new Context());
    +                channel.start();
    +                source.setChannelProcessor(new ChannelProcessor(new NifiChannelSelector(channel)));
    +                runner.setSource(source);
    +                runner.start();
    +            }
    +        } catch (Throwable th) {
    +            getLogger().error("Error creating source", th);
    +            throw Throwables.propagate(th);
    +        }
    +    }
    +
    +    @OnUnscheduled
    +    public void unScheduled() {
    +        if (runner != null) {
    +            runner.stop();
    +        }
    +        if (channel != null) {
    +            channel.stop();
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context,
    +            final ProcessSession session) throws ProcessException {
    +        if (source instanceof EventDrivenSource) {
    +            onEventDrivenTrigger(context, session);
    +        } else if (source instanceof PollableSource) {
    +            onPollableTrigger((PollableSource) source, context, session);
    +        }
    +    }
    +
    +    public void onPollableTrigger(final PollableSource pollableSource,
    +            final ProcessContext context, final ProcessSession session)
    +            throws ProcessException {
    +        try {
    +            pollableSource.setChannelProcessor(new ChannelProcessor(
    +                    new NifiChannelSelector(new NifiChannel(session, SUCCESS))));
    --- End diff --
    
    I think the channel, selector, and processor can be reused by setting the session on the channel each time. The current session would be used to create transactions, which we don't expect to be long lived because this is a source. Then you might not have to start and stop the source each time, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-99092420
  
    could you add a README that explains what the processor set is trying to do and what restrictions it has (e.g. what versions of HDFS does the HDFS sink work with, what versions of the Twitter API does the twitter source work with, etc) or pointers to where people can learn about the components included?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r32168365
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import java.util.List;
    +import java.util.Set;
    +import org.apache.flume.Context;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.Sink;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +/**
    + * This processor runs a Flume sink
    + */
    +@Tags({"flume", "hadoop", "get", "sink"})
    +@CapabilityDescription("Write FlowFile data to a Flume sink")
    +public class FlumeSinkProcessor extends AbstractFlumeProcessor {
    +
    +    public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
    +            .name("Sink Type")
    +            .description("The fully-qualified name of the Sink class")
    +            .required(true)
    +            .addValidator(createSinkValidator())
    +            .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +            .name("Agent Name")
    +            .description("The name of the agent used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("tier1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +            .name("Sink Name")
    +            .description("The name of the sink used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("sink-1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +            .name("Flume Configuration")
    +            .description("The Flume configuration for the sink copied from the flume.properties file")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    +    public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +
    +    private volatile Sink sink;
    +    private volatile NifiSinkSessionChannel channel;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
    +        this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        try {
    +            channel = new NifiSinkSessionChannel(SUCCESS, FAILURE);
    +            Configurables.configure(channel, new Context());
    +            channel.start();
    +
    +            sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
    +                    context.getProperty(SINK_TYPE).getValue());
    +            sink.setChannel(channel);
    +
    +            String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
    +            String agentName = context.getProperty(AGENT_NAME).getValue();
    +            String sinkName = context.getProperty(SOURCE_NAME).getValue();
    +            Configurables.configure(sink,
    +                    getFlumeSinkContext(flumeConfig, agentName, sinkName));
    +
    +            sink.start();
    +        } catch (Throwable th) {
    +            getLogger().error("Error creating sink", th);
    +            throw Throwables.propagate(th);
    +        }
    +    }
    +
    +    @OnUnscheduled
    +    public void unScheduled() {
    +        sink.stop();
    +        channel.stop();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context,
    +            final ProcessSession session) throws ProcessException {
    +
    +        channel.setSession(session);
    +        try {
    +            if (sink.process() == Sink.Status.BACKOFF) {
    --- End diff --
    
    Docs on our metrics API...i believe the metrics information is closed off from processors being able to inject anything custom.  The only game in town * i believe * is the counters mechanism which solely exists for Sean and Tony :-)  Which is admittedly a good enough reason.  What sorts of things would you like to do with the metrics?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29686853
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import java.io.File;
    +import static org.junit.Assert.assertEquals;
    +
    +import java.io.FileInputStream;
    +import java.io.FilenameFilter;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import org.apache.commons.io.filefilter.HiddenFileFilter;
    +import org.apache.flume.sink.NullSink;
    +import org.apache.flume.source.AvroSource;
    +
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.util.MockProcessContext;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.apache.nifi.util.file.FileUtils;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class FlumeSinkProcessorTest {
    +
    +  private static final Logger logger =
    +      LoggerFactory.getLogger(FlumeSinkProcessorTest.class);
    +  
    +    @Test
    +    public void testValidators() {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        Collection<ValidationResult> results;
    +        ProcessContext pc;
    +
    +        results = new HashSet<>();
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because Sink Type is required"));
    +        }
    +
    +        // non-existent class
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "invalid.class.name");
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to load sink"));
    +        }
    +
    +        // class doesn't implement Sink
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, AvroSource.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to create sink"));
    +        }
    +
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(0, results.size());
    +    }
    +
    +
    +    @Test
    +    public void testNullSink() throws IOException {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt");
    +        Map<String, String> attributes = new HashMap<>();
    +        attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
    +        runner.enqueue(fis, attributes);
    +        runner.run();
    +        fis.close();
    +    }
    +
    +    @Test
    +    public void testBatchSize() throws IOException {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        runner.setProperty(FlumeSinkProcessor.BATCH_SIZE, "1000");
    +        runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
    +            "tier1.sinks.sink-1.batchSize = 1000\n");
    +        for (int i = 0; i < 100000; i++) {
    +          runner.enqueue(String.valueOf(i).getBytes());
    +        }
    +        runner.run();
    +    }
    +    
    +    @Test
    +    public void testHdfsSink() throws IOException {
    +        File destDir = new File("target/hdfs");
    +        if (destDir.exists()) {
    +          FileUtils.deleteFilesInDir(destDir, null, logger);
    +        } else {
    +          destDir.mkdirs();
    +        }
    +
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "hdfs");
    +        runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
    +            "tier1.sinks.sink-1.hdfs.path = " + destDir.toURI().toString() + "\n" +
    +            "tier1.sinks.sink-1.hdfs.fileType = DataStream\n" +
    +            "tier1.sinks.sink-1.hdfs.serializer = TEXT\n" +
    +            "tier1.sinks.sink-1.serializer.appendNewline = false"
    +        );
    +        FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt");
    +        Map<String, String> attributes = new HashMap<>();
    +        attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
    +        runner.enqueue(fis, attributes);
    +        runner.run();
    +        fis.close();
    +
    +        File[] files = destDir.listFiles((FilenameFilter)HiddenFileFilter.VISIBLE);
    +        assertEquals("Unexpected number of destination files.", 1, files.length);
    +        File dst = files[0];
    +        byte[] expectedMd5 = FileUtils.computeMd5Digest(new File("src/test/resources/testdata/records.txt"));
    --- End diff --
    
    Please use getResourceAsStream so that this test resource will be loaded out of maven target/. This will allow us to later add pre-processing if necessary.
    
    i.e.
    
    InputStream is = getClass().getResourceAsStream("/testdata/records.txt");


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r32167935
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import java.util.List;
    +import java.util.Set;
    +import org.apache.flume.Context;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.Sink;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +/**
    + * This processor runs a Flume sink
    + */
    +@Tags({"flume", "hadoop", "get", "sink"})
    +@CapabilityDescription("Write FlowFile data to a Flume sink")
    +public class FlumeSinkProcessor extends AbstractFlumeProcessor {
    +
    +    public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
    +            .name("Sink Type")
    +            .description("The fully-qualified name of the Sink class")
    +            .required(true)
    +            .addValidator(createSinkValidator())
    +            .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +            .name("Agent Name")
    +            .description("The name of the agent used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("tier1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +            .name("Sink Name")
    +            .description("The name of the sink used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("sink-1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +            .name("Flume Configuration")
    +            .description("The Flume configuration for the sink copied from the flume.properties file")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    +    public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +
    +    private volatile Sink sink;
    +    private volatile NifiSinkSessionChannel channel;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
    +        this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        try {
    +            channel = new NifiSinkSessionChannel(SUCCESS, FAILURE);
    +            Configurables.configure(channel, new Context());
    +            channel.start();
    +
    +            sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
    +                    context.getProperty(SINK_TYPE).getValue());
    +            sink.setChannel(channel);
    +
    +            String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
    +            String agentName = context.getProperty(AGENT_NAME).getValue();
    +            String sinkName = context.getProperty(SOURCE_NAME).getValue();
    +            Configurables.configure(sink,
    +                    getFlumeSinkContext(flumeConfig, agentName, sinkName));
    +
    +            sink.start();
    +        } catch (Throwable th) {
    +            getLogger().error("Error creating sink", th);
    +            throw Throwables.propagate(th);
    +        }
    +    }
    +
    +    @OnUnscheduled
    +    public void unScheduled() {
    +        sink.stop();
    +        channel.stop();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context,
    +            final ProcessSession session) throws ProcessException {
    +
    +        channel.setSession(session);
    +        try {
    +            if (sink.process() == Sink.Status.BACKOFF) {
    --- End diff --
    
    Ryan's view on context.yield is inline with its intent.  Generally it is a way for the processor to communicate to the framework that it doesn't want to be scheduled for a while because some transient issue is occurring that it expects will clear up 'later'.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r32167867
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import java.util.List;
    +import java.util.Set;
    +import org.apache.flume.Context;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.Sink;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +/**
    + * This processor runs a Flume sink
    + */
    +@Tags({"flume", "hadoop", "get", "sink"})
    +@CapabilityDescription("Write FlowFile data to a Flume sink")
    +public class FlumeSinkProcessor extends AbstractFlumeProcessor {
    +
    +    public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
    +            .name("Sink Type")
    +            .description("The fully-qualified name of the Sink class")
    +            .required(true)
    +            .addValidator(createSinkValidator())
    +            .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +            .name("Agent Name")
    +            .description("The name of the agent used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("tier1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +            .name("Sink Name")
    +            .description("The name of the sink used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("sink-1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +            .name("Flume Configuration")
    +            .description("The Flume configuration for the sink copied from the flume.properties file")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    +    public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +
    +    private volatile Sink sink;
    +    private volatile NifiSinkSessionChannel channel;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
    +        this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        try {
    +            channel = new NifiSinkSessionChannel(SUCCESS, FAILURE);
    +            Configurables.configure(channel, new Context());
    +            channel.start();
    +
    +            sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
    +                    context.getProperty(SINK_TYPE).getValue());
    +            sink.setChannel(channel);
    +
    +            String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
    +            String agentName = context.getProperty(AGENT_NAME).getValue();
    +            String sinkName = context.getProperty(SOURCE_NAME).getValue();
    +            Configurables.configure(sink,
    +                    getFlumeSinkContext(flumeConfig, agentName, sinkName));
    +
    +            sink.start();
    +        } catch (Throwable th) {
    +            getLogger().error("Error creating sink", th);
    +            throw Throwables.propagate(th);
    +        }
    +    }
    +
    +    @OnUnscheduled
    +    public void unScheduled() {
    +        sink.stop();
    +        channel.stop();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context,
    +            final ProcessSession session) throws ProcessException {
    +
    +        channel.setSession(session);
    +        try {
    +            if (sink.process() == Sink.Status.BACKOFF) {
    --- End diff --
    
    That makes sense. The description of yield wasn't clear to me.
    
    I might be able to get access to Flume counters. Are there docs on the NiFi metrics API?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r32038444
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import java.util.List;
    +import java.util.Set;
    +import org.apache.flume.Context;
    +import org.apache.flume.Event;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.EventDrivenSource;
    +import org.apache.flume.PollableSource;
    +import org.apache.flume.Source;
    +import org.apache.flume.SourceRunner;
    +import org.apache.flume.Transaction;
    +import org.apache.flume.channel.ChannelProcessor;
    +import org.apache.flume.channel.MemoryChannel;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.flume.source.EventDrivenSourceRunner;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +/**
    + * This processor runs a Flume source
    + */
    +@Tags({"flume", "hadoop", "get", "source"})
    +@CapabilityDescription("Generate FlowFile data from a Flume source")
    +public class FlumeSourceProcessor extends AbstractFlumeProcessor {
    +
    +    private Source source;
    +    private SourceRunner runner;
    +    private MemoryChannel channel;
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
    +            .name("Source Type")
    +            .description("The fully-qualified name of the Source class")
    +            .required(true)
    +            .addValidator(createSourceValidator())
    +            .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +            .name("Agent Name")
    +            .description("The name of the agent used in the Flume source configuration")
    +            .required(true)
    +            .defaultValue("tier1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +            .name("Source Name")
    +            .description("The name of the source used in the Flume source configuration")
    +            .required(true)
    +            .defaultValue("src-1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +            .name("Flume Configuration")
    +            .description("The Flume configuration for the source copied from the flume.properties file")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
    +        this.relationships = ImmutableSet.of(SUCCESS);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        try {
    +            source = SOURCE_FACTORY.create(
    +                    context.getProperty(SOURCE_NAME).getValue(),
    +                    context.getProperty(SOURCE_TYPE).getValue());
    +
    +            String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
    +            String agentName = context.getProperty(AGENT_NAME).getValue();
    +            String sourceName = context.getProperty(SOURCE_NAME).getValue();
    +            Configurables.configure(source,
    +                    getFlumeSourceContext(flumeConfig, agentName, sourceName));
    +
    +            if (source instanceof EventDrivenSource) {
    +                runner = new EventDrivenSourceRunner();
    +                channel = new MemoryChannel();
    +                Configurables.configure(channel, new Context());
    +                channel.start();
    +                source.setChannelProcessor(new ChannelProcessor(new NifiChannelSelector(channel)));
    +                runner.setSource(source);
    +                runner.start();
    +            }
    +        } catch (Throwable th) {
    +            getLogger().error("Error creating source", th);
    +            throw Throwables.propagate(th);
    +        }
    +    }
    +
    +    @OnUnscheduled
    +    public void unScheduled() {
    +        if (runner != null) {
    +            runner.stop();
    +        }
    +        if (channel != null) {
    +            channel.stop();
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context,
    +            final ProcessSession session) throws ProcessException {
    +        if (source instanceof EventDrivenSource) {
    +            onEventDrivenTrigger(context, session);
    +        } else if (source instanceof PollableSource) {
    +            onPollableTrigger((PollableSource) source, context, session);
    +        }
    +    }
    +
    +    public void onPollableTrigger(final PollableSource pollableSource,
    +            final ProcessContext context, final ProcessSession session)
    +            throws ProcessException {
    +        try {
    +            pollableSource.setChannelProcessor(new ChannelProcessor(
    +                    new NifiChannelSelector(new NifiChannel(session, SUCCESS))));
    --- End diff --
    
    Yes that should work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-120740492
  
    Sorry for the delay, I've been looking at the threading model and I'm convinced that for the first version we should mark the processors as serial only. 
    
    This matches the behavior of 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-109367430
  
    doing the incremental commit as its own thing means we could do either though. "changed files" tab shows it as the whole and just the commit shows the incremental.
    
    squashing everything means that if things work out we don't have to wait for another round trip of communication for the ready-to-push commit though, and I can use vimdiff with the previous change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-109343232
  
    I have reached out to Joey on this.  He plans to incorporate Sean's changes and reach out to the Flume community.  Want to make sure they're comfortable with this as well.  Joey is quite time crunched so if he would like to pass the baton we would be happy to try and pick it up.  Great contrib.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by mcgilman <gi...@git.apache.org>.
Github user mcgilman commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-121392432
  
    @joey I've pulled your changes into develop. Successful build, contrib-check passed, ran a sample flow pulling from one directory and placing into another. Everything looked good. I made a few tweaks to the NOTICE file you provided and rolled up the dependent licenses into the assembly's NOTICE file. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29687085
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.HashSet;
    +import java.util.List;
    +import org.apache.flume.sink.NullSink;
    +import org.apache.flume.source.AvroSource;
    +
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.MockProcessContext;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.apache.nifi.util.file.FileUtils;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class FlumeSourceProcessorTest {
    +
    +  private static final Logger logger =
    +      LoggerFactory.getLogger(FlumeSourceProcessorTest.class);
    +
    +
    +    @Test
    +    public void testValidators() {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSourceProcessor.class);
    +        Collection<ValidationResult> results;
    +        ProcessContext pc;
    +
    +        results = new HashSet<>();
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because Source Type is required"));
    +        }
    +
    +        // non-existent class
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "invalid.class.name");
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to load source"));
    +        }
    +
    +        // class doesn't implement Source
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, NullSink.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to create source"));
    +        }
    +
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, AvroSource.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(0, results.size());
    +    }
    +
    +    @Test
    +    public void testSequenceSource() {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSourceProcessor.class);
    +        runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "seq");
    +        runner.run();
    +        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS);
    +        Assert.assertEquals(1, flowFiles.size());
    +        for (MockFlowFile flowFile : flowFiles) {
    +          logger.error(flowFile.toString());
    +            Assert.assertEquals(1, flowFile.getSize());
    +        }
    +    }
    +
    +    @Test
    +    public void testSourceWithConfig() throws IOException {
    +        File spoolDirectory = new File("target/spooldir");
    +        if (spoolDirectory.exists()) {
    +          FileUtils.deleteFilesInDir(spoolDirectory, null, logger);
    +        } else {
    +          spoolDirectory.mkdirs();
    +        }
    +        File src = new File("src/test/resources/testdata/records.txt");
    --- End diff --
    
    Please use getResourceAsStream so that this test resource will be loaded out of maven target/. This will allow us to later add pre-processing if necessary.
    
    i.e.
    
        File src = new File(getClass().getResource("/testdata/records.txt").toURI());



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-109409663
  
    Based on @busbey's last comment, I'll leave out the squash for now in case anyone wants to review just the changes since the last PR.
    
    I'm also going to leave out a rebase unless you want that now. I figured I'd get @busbey's review done and then do a rebase and squash before handing it off to a reviewer from Flume.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29686956
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.HashSet;
    +import java.util.List;
    +import org.apache.flume.sink.NullSink;
    +import org.apache.flume.source.AvroSource;
    +
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.MockProcessContext;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.apache.nifi.util.file.FileUtils;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class FlumeSourceProcessorTest {
    +
    +  private static final Logger logger =
    +      LoggerFactory.getLogger(FlumeSourceProcessorTest.class);
    +
    +
    +    @Test
    +    public void testValidators() {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSourceProcessor.class);
    +        Collection<ValidationResult> results;
    +        ProcessContext pc;
    +
    +        results = new HashSet<>();
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because Source Type is required"));
    +        }
    +
    +        // non-existent class
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "invalid.class.name");
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to load source"));
    +        }
    +
    +        // class doesn't implement Source
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, NullSink.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to create source"));
    +        }
    +
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, AvroSource.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(0, results.size());
    +    }
    +
    +    @Test
    +    public void testSequenceSource() {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSourceProcessor.class);
    +        runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "seq");
    +        runner.run();
    +        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS);
    +        Assert.assertEquals(1, flowFiles.size());
    +        for (MockFlowFile flowFile : flowFiles) {
    +          logger.error(flowFile.toString());
    +            Assert.assertEquals(1, flowFile.getSize());
    +        }
    +    }
    +
    +    @Test
    +    public void testSourceWithConfig() throws IOException {
    +        File spoolDirectory = new File("target/spooldir");
    --- End diff --
    
    please get this from junit / maven. For example, by using a TemporaryDirectory JUnit rule.
    
        @Rule
        public final TemporaryFolder temp = new TemporaryFolder();
    
        @Test
        public void someTest() {
            File destDir = temp.newFolder("spooldir");


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29672158
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java ---
    @@ -0,0 +1,31 @@
    +
    --- End diff --
    
    This class needs a license notice at the top.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-109359927
  
    Ahh i see.  I'd like to see the entire patch as a new entry so it can be reviewed in whole again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r31841961
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import java.io.File;
    +import static org.junit.Assert.assertEquals;
    +
    +import java.io.FileInputStream;
    +import java.io.FilenameFilter;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import org.apache.commons.io.filefilter.HiddenFileFilter;
    +import org.apache.flume.sink.NullSink;
    +import org.apache.flume.source.AvroSource;
    +
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.util.MockProcessContext;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.apache.nifi.util.file.FileUtils;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class FlumeSinkProcessorTest {
    +
    +  private static final Logger logger =
    +      LoggerFactory.getLogger(FlumeSinkProcessorTest.class);
    +  
    +    @Test
    +    public void testValidators() {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        Collection<ValidationResult> results;
    +        ProcessContext pc;
    +
    +        results = new HashSet<>();
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because Sink Type is required"));
    +        }
    +
    +        // non-existent class
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "invalid.class.name");
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to load sink"));
    +        }
    +
    +        // class doesn't implement Sink
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, AvroSource.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to create sink"));
    +        }
    +
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(0, results.size());
    +    }
    +
    +
    +    @Test
    +    public void testNullSink() throws IOException {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt");
    +        Map<String, String> attributes = new HashMap<>();
    +        attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
    +        runner.enqueue(fis, attributes);
    +        runner.run();
    +        fis.close();
    +    }
    +
    +    @Test
    +    public void testBatchSize() throws IOException {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        runner.setProperty(FlumeSinkProcessor.BATCH_SIZE, "1000");
    +        runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
    +            "tier1.sinks.sink-1.batchSize = 1000\n");
    +        for (int i = 0; i < 100000; i++) {
    +          runner.enqueue(String.valueOf(i).getBytes());
    +        }
    +        runner.run();
    +    }
    +    
    +    @Test
    +    public void testHdfsSink() throws IOException {
    +        File destDir = new File("target/hdfs");
    --- End diff --
    
    TemporaryFolder guarantees that the folders are destroyed at the end of hte test cycle, so there's nothing for maven clean to handle.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29672239
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java ---
    @@ -0,0 +1,31 @@
    +
    +package org.apache.nifi.processors.flume;
    +
    +import org.apache.flume.Context;
    +import org.apache.flume.channel.BasicChannelSemantics;
    +import org.apache.flume.channel.BasicTransactionSemantics;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +
    +
    +public class NifiChannel extends BasicChannelSemantics {
    +  private final ProcessSession session;
    +  private final Relationship relationship;
    +
    +  public NifiChannel(ProcessSession session, Relationship relationship) {
    +    this.session = session;
    +    this.relationship = relationship;
    +  }
    +
    +  @Override
    +  protected BasicTransactionSemantics createTransaction() {
    +    return new NifiTransaction(session, relationship);
    +  }
    +
    +  @Override
    +  public void configure(Context context) {
    +    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
    --- End diff --
    
    please leave out the autogenerated comment.
    
    does this need to throw? could it just be a no-op?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29672315
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java ---
    @@ -0,0 +1,55 @@
    +
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.collect.ImmutableList;
    +import java.util.List;
    +import org.apache.flume.Channel;
    +import org.apache.flume.ChannelSelector;
    +import org.apache.flume.Context;
    +import org.apache.flume.Event;
    +
    +
    +public class NifiChannelSelector implements ChannelSelector {
    +  private String name;
    +  private final List<Channel> requiredChannels;
    +  private final List<Channel> optionalChannels;
    +
    +  public NifiChannelSelector(Channel channel) {
    +    requiredChannels = ImmutableList.of(channel);
    +    optionalChannels = ImmutableList.of();
    +  }
    +
    +  @Override
    +  public List<Channel> getRequiredChannels(Event event) {
    +    return requiredChannels;
    +  }
    +
    +  @Override
    +  public List<Channel> getOptionalChannels(Event event) {
    +    return optionalChannels;
    +  }
    +
    +  @Override
    +  public List<Channel> getAllChannels() {
    +    return requiredChannels;
    +  }
    +
    +  @Override
    +  public void setChannels(List<Channel> channels) {
    +    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
    --- End diff --
    
    please leave out autogenerated comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29693520
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/testdata/records.txt ---
    @@ -0,0 +1,4 @@
    +record 1
    --- End diff --
    
    please either add a license header or add this file to the list of exemptions for apache-rat:check


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r32167493
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import java.util.List;
    +import java.util.Set;
    +import org.apache.flume.Context;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.Sink;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +/**
    + * This processor runs a Flume sink
    + */
    +@Tags({"flume", "hadoop", "get", "sink"})
    +@CapabilityDescription("Write FlowFile data to a Flume sink")
    +public class FlumeSinkProcessor extends AbstractFlumeProcessor {
    +
    +    public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
    +            .name("Sink Type")
    +            .description("The fully-qualified name of the Sink class")
    +            .required(true)
    +            .addValidator(createSinkValidator())
    +            .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +            .name("Agent Name")
    +            .description("The name of the agent used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("tier1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +            .name("Sink Name")
    +            .description("The name of the sink used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("sink-1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +            .name("Flume Configuration")
    +            .description("The Flume configuration for the sink copied from the flume.properties file")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    +    public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +
    +    private volatile Sink sink;
    +    private volatile NifiSinkSessionChannel channel;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
    +        this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        try {
    +            channel = new NifiSinkSessionChannel(SUCCESS, FAILURE);
    +            Configurables.configure(channel, new Context());
    +            channel.start();
    +
    +            sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
    +                    context.getProperty(SINK_TYPE).getValue());
    +            sink.setChannel(channel);
    +
    +            String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
    +            String agentName = context.getProperty(AGENT_NAME).getValue();
    +            String sinkName = context.getProperty(SOURCE_NAME).getValue();
    +            Configurables.configure(sink,
    +                    getFlumeSinkContext(flumeConfig, agentName, sinkName));
    +
    +            sink.start();
    +        } catch (Throwable th) {
    +            getLogger().error("Error creating sink", th);
    +            throw Throwables.propagate(th);
    +        }
    +    }
    +
    +    @OnUnscheduled
    +    public void unScheduled() {
    +        sink.stop();
    +        channel.stop();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context,
    +            final ProcessSession session) throws ProcessException {
    +
    +        channel.setSession(session);
    +        try {
    +            if (sink.process() == Sink.Status.BACKOFF) {
    --- End diff --
    
    I don't think backoff should make this call `context.yield()`. In Flume, backoff is used to signal the sink runner that there is no more input, but NiFi already manages things like that. My understanding is that in NiFi, yield means it can't process for some external reason so there is no point in triggering even if there is input. (For example, can't connect to a DB so wait a few seconds for it to come up and retry.) That's not what we want to signal here, though. I think this should just call process and maybe increment a counter for how many times the result is backoff vs ready.
    
    Speaking of counters, is there a way to hijack Flume's counters?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29673192
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import java.io.File;
    +import static org.junit.Assert.assertEquals;
    +
    +import java.io.FileInputStream;
    +import java.io.FilenameFilter;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import org.apache.commons.io.filefilter.HiddenFileFilter;
    +import org.apache.flume.sink.NullSink;
    +import org.apache.flume.source.AvroSource;
    +
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.util.MockProcessContext;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.apache.nifi.util.file.FileUtils;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class FlumeSinkProcessorTest {
    +
    +  private static final Logger logger =
    +      LoggerFactory.getLogger(FlumeSinkProcessorTest.class);
    +  
    +    @Test
    +    public void testValidators() {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        Collection<ValidationResult> results;
    +        ProcessContext pc;
    +
    +        results = new HashSet<>();
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because Sink Type is required"));
    +        }
    +
    +        // non-existent class
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "invalid.class.name");
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to load sink"));
    +        }
    +
    +        // class doesn't implement Sink
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, AvroSource.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to create sink"));
    +        }
    +
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(0, results.size());
    +    }
    +
    +
    +    @Test
    +    public void testNullSink() throws IOException {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt");
    --- End diff --
    
    Please use getResourceAsStream so that this test resource will be loaded out of maven target/. This will allow us to later add pre-processing if necessary.
    
    i.e.
        
        InputStream is = getClass().getResourceAsStream("/testdata/records.txt");
        


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r31857427
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java ---
    @@ -0,0 +1,193 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import com.google.common.collect.Lists;
    +import java.util.List;
    +import java.util.Set;
    +import org.apache.flume.Context;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.Sink;
    +import org.apache.flume.Transaction;
    +import org.apache.flume.channel.MemoryChannel;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.jasper.compiler.JspUtil;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.flume.util.FlowFileEvent;
    +
    +/**
    + * This processor runs a Flume sink
    + */
    +@Tags({"flume", "hadoop", "get", "sink"})
    +@CapabilityDescription("Generate FlowFile data from a Flume sink")
    +public class FlumeSinkProcessor extends AbstractFlumeProcessor {
    +
    +    private Sink sink;
    +    private MemoryChannel channel;
    +
    +    public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
    +            .name("Sink Type")
    +            .description("The fully-qualified name of the Sink class")
    +            .required(true)
    +            .addValidator(createSinkValidator())
    +            .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +            .name("Agent Name")
    +            .description("The name of the agent used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("tier1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +            .name("Sink Name")
    +            .description("The name of the sink used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("sink-1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +            .name("Flume Configuration")
    +            .description("The Flume configuration for the sink copied from the flume.properties file")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .description("The number of FlowFiles to process in a single batch")
    +            .required(true)
    +            .defaultValue("100")
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    +    public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +    private int batchSize;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG, BATCH_SIZE);
    +        this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +
    +        try {
    +            channel = new MemoryChannel();
    +            Context memoryChannelContext = new Context();
    +            memoryChannelContext.put("capacity", String.valueOf(batchSize*10));
    +            memoryChannelContext.put("transactionCapacity", String.valueOf(batchSize*10));
    +            Configurables.configure(channel, memoryChannelContext);
    +            channel.start();
    +
    +            sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
    +                    context.getProperty(SINK_TYPE).getValue());
    +            sink.setChannel(channel);
    +
    +            String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
    +            String agentName = context.getProperty(AGENT_NAME).getValue();
    +            String sinkName = context.getProperty(SOURCE_NAME).getValue();
    +            Configurables.configure(sink,
    +                    getFlumeSinkContext(flumeConfig, agentName, sinkName));
    +
    +            sink.start();
    +        } catch (Throwable th) {
    +            getLogger().error("Error creating sink", th);
    +            throw Throwables.propagate(th);
    +        }
    +    }
    +
    +    @OnUnscheduled
    +    public void unScheduled() {
    +        sink.stop();
    +        channel.stop();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context,
    +            final ProcessSession session) throws ProcessException {
    +        List<FlowFile> flowFiles = Lists.newArrayListWithExpectedSize(batchSize);
    +        for (int i = 0; i < batchSize; i++) {
    +            FlowFile flowFile = session.get();
    +            if (flowFile == null) {
    +              break;
    +            }
    +
    +            flowFiles.add(flowFile);
    +        }
    +
    +        Transaction transaction = channel.getTransaction();
    --- End diff --
    
    Why not wrap the current session in a Flume transaction like the polling source does?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-120740843
  
    Sorry, hit comment by accident. 
    
    This matches the behavior of Flume so there's no loss in functionality for the user. In fact a number of sinks/sources have config parameters to run threads internally for parallelism so users can still take advantage of those. Finally, I reviewed some Flume code and I'm nervous about finding subtle concurrency issues with trying to run multi-threaded sinks/sources. 
    
    If performance is a real concern as users start working with it, we can work on adding it in the future.
    
    I should have an updated PR with the other comments addressed later today. 
    
    As far as a drop down of sources/sinks go, Flume has no service loader mechanism so I can't do it generally. I could populate a combo box with the sources/sinks we ship and users could type in a class if they added one after the fact. 
    
    There is no API for introspecting config parameters so we'll have to keep that a copy/paste experience. I agree with Ryan, if any sources/sinks see a lot of adoption we should re-write them as first-class processors. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-120770118
  
    i'd encourage going with serial only mode for now.  Let's learn more and iterate from there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29692723
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml ---
    @@ -0,0 +1,126 @@
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +    <modelVersion>4.0.0</modelVersion>
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-flume-bundle</artifactId>
    +        <version>0.1.0-incubating-SNAPSHOT</version>
    +    </parent>
    +    <artifactId>nifi-flume-processors</artifactId>
    +    <packaging>jar</packaging>
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-api</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-utils</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-processor-utils</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-flowfile-packager</artifactId>
    +        </dependency>
    +        <dependency> 
    +            <groupId>org.apache.flume</groupId> 
    +            <artifactId>flume-ng-sdk</artifactId> 
    +            <version>1.5.2</version>
    +        </dependency> 
    --- End diff --
    
    nit: trailing whitespace


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29685259
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import java.io.File;
    +import static org.junit.Assert.assertEquals;
    +
    +import java.io.FileInputStream;
    +import java.io.FilenameFilter;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import org.apache.commons.io.filefilter.HiddenFileFilter;
    +import org.apache.flume.sink.NullSink;
    +import org.apache.flume.source.AvroSource;
    +
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.util.MockProcessContext;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.apache.nifi.util.file.FileUtils;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class FlumeSinkProcessorTest {
    +
    +  private static final Logger logger =
    +      LoggerFactory.getLogger(FlumeSinkProcessorTest.class);
    +  
    +    @Test
    +    public void testValidators() {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        Collection<ValidationResult> results;
    +        ProcessContext pc;
    +
    +        results = new HashSet<>();
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because Sink Type is required"));
    +        }
    +
    +        // non-existent class
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "invalid.class.name");
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to load sink"));
    +        }
    +
    +        // class doesn't implement Sink
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, AvroSource.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to create sink"));
    +        }
    +
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(0, results.size());
    +    }
    +
    +
    +    @Test
    +    public void testNullSink() throws IOException {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt");
    +        Map<String, String> attributes = new HashMap<>();
    +        attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
    +        runner.enqueue(fis, attributes);
    +        runner.run();
    +        fis.close();
    +    }
    +
    +    @Test
    +    public void testBatchSize() throws IOException {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        runner.setProperty(FlumeSinkProcessor.BATCH_SIZE, "1000");
    +        runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
    +            "tier1.sinks.sink-1.batchSize = 1000\n");
    +        for (int i = 0; i < 100000; i++) {
    +          runner.enqueue(String.valueOf(i).getBytes());
    +        }
    +        runner.run();
    +    }
    +    
    +    @Test
    +    public void testHdfsSink() throws IOException {
    +        File destDir = new File("target/hdfs");
    +        if (destDir.exists()) {
    +          FileUtils.deleteFilesInDir(destDir, null, logger);
    +        } else {
    +          destDir.mkdirs();
    +        }
    +
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "hdfs");
    +        runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
    +            "tier1.sinks.sink-1.hdfs.path = " + destDir.toURI().toString() + "\n" +
    +            "tier1.sinks.sink-1.hdfs.fileType = DataStream\n" +
    +            "tier1.sinks.sink-1.hdfs.serializer = TEXT\n" +
    +            "tier1.sinks.sink-1.serializer.appendNewline = false"
    +        );
    +        FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt");
    --- End diff --
    
    Please use getResourceAsStream so that this test resource will be loaded out of maven target/. This will allow us to later add pre-processing if necessary.
    
    i.e.
    
    InputStream is = getClass().getResourceAsStream("/testdata/records.txt");


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r31840911
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml ---
    @@ -0,0 +1,25 @@
    +<?xml version="1.0"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements.  See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License.  You may obtain a copy of the License at
    +      http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    +
    +<!-- Put site-specific property overrides in this file. -->
    +
    +<configuration>
    +    <property>
    +        <name>fs.defaultFS</name>
    +        <value>file:///</value>
    --- End diff --
    
    When setting `fs.defaultFS` to `file:///some/path` that doesn't do anything to change the root of the local filesystem that Hadoop classes use, they always start at the root of the local filesystem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-116823666
  
    I think thread-local would be okay because the thread doesn't change while the process is running. In onTrigger, you would first set the thread's session, then call the Flume processor that pulls from the channel by accessing the session for its thread.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r31855713
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java ---
    @@ -0,0 +1,193 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import com.google.common.collect.Lists;
    +import java.util.List;
    +import java.util.Set;
    +import org.apache.flume.Context;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.Sink;
    +import org.apache.flume.Transaction;
    +import org.apache.flume.channel.MemoryChannel;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.jasper.compiler.JspUtil;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.flume.util.FlowFileEvent;
    +
    +/**
    + * This processor runs a Flume sink
    + */
    +@Tags({"flume", "hadoop", "get", "sink"})
    +@CapabilityDescription("Generate FlowFile data from a Flume sink")
    +public class FlumeSinkProcessor extends AbstractFlumeProcessor {
    +
    +    private Sink sink;
    +    private MemoryChannel channel;
    +
    +    public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
    +            .name("Sink Type")
    +            .description("The fully-qualified name of the Sink class")
    +            .required(true)
    +            .addValidator(createSinkValidator())
    +            .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +            .name("Agent Name")
    +            .description("The name of the agent used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("tier1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +            .name("Sink Name")
    +            .description("The name of the sink used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("sink-1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +            .name("Flume Configuration")
    +            .description("The Flume configuration for the sink copied from the flume.properties file")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .description("The number of FlowFiles to process in a single batch")
    +            .required(true)
    +            .defaultValue("100")
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    +    public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +    private int batchSize;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG, BATCH_SIZE);
    +        this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +
    +        try {
    +            channel = new MemoryChannel();
    +            Context memoryChannelContext = new Context();
    +            memoryChannelContext.put("capacity", String.valueOf(batchSize*10));
    +            memoryChannelContext.put("transactionCapacity", String.valueOf(batchSize*10));
    +            Configurables.configure(channel, memoryChannelContext);
    +            channel.start();
    +
    +            sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
    +                    context.getProperty(SINK_TYPE).getValue());
    +            sink.setChannel(channel);
    +
    +            String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
    +            String agentName = context.getProperty(AGENT_NAME).getValue();
    +            String sinkName = context.getProperty(SOURCE_NAME).getValue();
    +            Configurables.configure(sink,
    +                    getFlumeSinkContext(flumeConfig, agentName, sinkName));
    +
    +            sink.start();
    +        } catch (Throwable th) {
    +            getLogger().error("Error creating sink", th);
    +            throw Throwables.propagate(th);
    +        }
    +    }
    +
    +    @OnUnscheduled
    +    public void unScheduled() {
    +        sink.stop();
    +        channel.stop();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context,
    +            final ProcessSession session) throws ProcessException {
    +        List<FlowFile> flowFiles = Lists.newArrayListWithExpectedSize(batchSize);
    +        for (int i = 0; i < batchSize; i++) {
    +            FlowFile flowFile = session.get();
    +            if (flowFile == null) {
    +              break;
    +            }
    +
    +            flowFiles.add(flowFile);
    +        }
    +
    +        Transaction transaction = channel.getTransaction();
    +        try {
    +            transaction.begin();
    +            for (FlowFile flowFile : flowFiles) {
    +                channel.put(new FlowFileEvent(flowFile, session));
    +            }
    +            transaction.commit();
    +        } catch (Throwable th) {
    +            transaction.rollback();
    +            throw Throwables.propagate(th);
    +        } finally {
    +            transaction.close();
    +        }
    +
    +        try {
    +            Sink.Status status;
    +            do {
    +              status = sink.process();
    --- End diff --
    
    Nit: inconsistent indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r32168462
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.EventDrivenSource;
    +import org.apache.flume.PollableSource;
    +import org.apache.flume.Source;
    +import org.apache.flume.channel.ChannelProcessor;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.flume.source.EventDrivenSourceRunner;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +/**
    + * This processor runs a Flume source
    + */
    +@Tags({"flume", "hadoop", "get", "source"})
    +@CapabilityDescription("Generate FlowFile data from a Flume source")
    +public class FlumeSourceProcessor extends AbstractFlumeProcessor {
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
    +        .name("Source Type")
    +        .description("The fully-qualified name of the Source class")
    +        .required(true)
    +        .addValidator(createSourceValidator())
    +        .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +        .name("Agent Name")
    +        .description("The name of the agent used in the Flume source configuration")
    +        .required(true)
    +        .defaultValue("tier1")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +        .name("Source Name")
    +        .description("The name of the source used in the Flume source configuration")
    +        .required(true)
    +        .defaultValue("src-1")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +        .name("Flume Configuration")
    +        .description("The Flume configuration for the source copied from the flume.properties file")
    +        .required(true)
    +        .defaultValue("")
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success")
    +        .build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +
    +    private volatile Source source;
    +
    +    private final NifiSessionChannel pollableSourceChannel = new NifiSessionChannel(SUCCESS);
    +    private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = new AtomicReference<>(null);
    +    private final AtomicReference<EventDrivenSourceRunner> runnerRef = new AtomicReference<>(null);
    +    private final AtomicReference<NifiSessionFactoryChannel> eventDrivenSourceChannelRef = new AtomicReference<>(null);
    +    private final AtomicReference<Boolean> stopping = new AtomicReference<>(false);
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
    +        this.relationships = ImmutableSet.of(SUCCESS);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        try {
    +            stopping.set(false);
    +            source = SOURCE_FACTORY.create(
    +                context.getProperty(SOURCE_NAME)
    +                .getValue(),
    +                context.getProperty(SOURCE_TYPE)
    +                .getValue());
    +
    +            String flumeConfig = context.getProperty(FLUME_CONFIG)
    +                .getValue();
    +            String agentName = context.getProperty(AGENT_NAME)
    +                .getValue();
    +            String sourceName = context.getProperty(SOURCE_NAME)
    +                .getValue();
    +            Configurables.configure(source,
    +                getFlumeSourceContext(flumeConfig, agentName, sourceName));
    +
    +            if (source instanceof PollableSource) {
    +                source.setChannelProcessor(new ChannelProcessor(
    +                    new NifiChannelSelector(pollableSourceChannel)));
    +                source.start();
    --- End diff --
    
    I don't think this should be started until onTrigger so the ProcessSessionFactory can be set. Otherwise, the channel could start sending events to this source, which will immediately fail because there is no session.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-101665850
  
    Sean this was a pretty awesome review.  I also really like the Junit temp directory thing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29692716
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml ---
    @@ -0,0 +1,126 @@
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +    <modelVersion>4.0.0</modelVersion>
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-flume-bundle</artifactId>
    +        <version>0.1.0-incubating-SNAPSHOT</version>
    +    </parent>
    +    <artifactId>nifi-flume-processors</artifactId>
    +    <packaging>jar</packaging>
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-api</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-utils</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-processor-utils</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-flowfile-packager</artifactId>
    +        </dependency>
    +        <dependency> 
    +            <groupId>org.apache.flume</groupId> 
    +            <artifactId>flume-ng-sdk</artifactId> 
    --- End diff --
    
    nit: trailing whitespace


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-110475354
  
    @joey did you find a good way to have a reference to a `ProcessSession` outside of the `onTrigger` method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r31855669
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java ---
    @@ -0,0 +1,193 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import com.google.common.collect.Lists;
    +import java.util.List;
    +import java.util.Set;
    +import org.apache.flume.Context;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.Sink;
    +import org.apache.flume.Transaction;
    +import org.apache.flume.channel.MemoryChannel;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.jasper.compiler.JspUtil;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.flume.util.FlowFileEvent;
    +
    +/**
    + * This processor runs a Flume sink
    + */
    +@Tags({"flume", "hadoop", "get", "sink"})
    +@CapabilityDescription("Generate FlowFile data from a Flume sink")
    +public class FlumeSinkProcessor extends AbstractFlumeProcessor {
    +
    +    private Sink sink;
    +    private MemoryChannel channel;
    +
    +    public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
    +            .name("Sink Type")
    +            .description("The fully-qualified name of the Sink class")
    +            .required(true)
    +            .addValidator(createSinkValidator())
    +            .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +            .name("Agent Name")
    +            .description("The name of the agent used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("tier1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +            .name("Sink Name")
    +            .description("The name of the sink used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("sink-1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +            .name("Flume Configuration")
    +            .description("The Flume configuration for the sink copied from the flume.properties file")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .description("The number of FlowFiles to process in a single batch")
    +            .required(true)
    +            .defaultValue("100")
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    +    public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +    private int batchSize;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG, BATCH_SIZE);
    +        this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +
    +        try {
    +            channel = new MemoryChannel();
    +            Context memoryChannelContext = new Context();
    +            memoryChannelContext.put("capacity", String.valueOf(batchSize*10));
    +            memoryChannelContext.put("transactionCapacity", String.valueOf(batchSize*10));
    +            Configurables.configure(channel, memoryChannelContext);
    +            channel.start();
    +
    +            sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
    +                    context.getProperty(SINK_TYPE).getValue());
    +            sink.setChannel(channel);
    +
    +            String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
    +            String agentName = context.getProperty(AGENT_NAME).getValue();
    +            String sinkName = context.getProperty(SOURCE_NAME).getValue();
    +            Configurables.configure(sink,
    +                    getFlumeSinkContext(flumeConfig, agentName, sinkName));
    +
    +            sink.start();
    +        } catch (Throwable th) {
    +            getLogger().error("Error creating sink", th);
    +            throw Throwables.propagate(th);
    +        }
    +    }
    +
    +    @OnUnscheduled
    +    public void unScheduled() {
    +        sink.stop();
    +        channel.stop();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context,
    +            final ProcessSession session) throws ProcessException {
    +        List<FlowFile> flowFiles = Lists.newArrayListWithExpectedSize(batchSize);
    +        for (int i = 0; i < batchSize; i++) {
    --- End diff --
    
    Why not use [`ProcessSession.get(int max)`](https://github.com/apache/incubator-nifi/blob/develop/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java#L143)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29686788
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import java.io.File;
    +import static org.junit.Assert.assertEquals;
    +
    +import java.io.FileInputStream;
    +import java.io.FilenameFilter;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import org.apache.commons.io.filefilter.HiddenFileFilter;
    +import org.apache.flume.sink.NullSink;
    +import org.apache.flume.source.AvroSource;
    +
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.util.MockProcessContext;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.apache.nifi.util.file.FileUtils;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class FlumeSinkProcessorTest {
    +
    +  private static final Logger logger =
    +      LoggerFactory.getLogger(FlumeSinkProcessorTest.class);
    +  
    +    @Test
    +    public void testValidators() {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        Collection<ValidationResult> results;
    +        ProcessContext pc;
    +
    +        results = new HashSet<>();
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because Sink Type is required"));
    +        }
    +
    +        // non-existent class
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "invalid.class.name");
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to load sink"));
    +        }
    +
    +        // class doesn't implement Sink
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, AvroSource.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to create sink"));
    +        }
    +
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(0, results.size());
    +    }
    +
    +
    +    @Test
    +    public void testNullSink() throws IOException {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt");
    +        Map<String, String> attributes = new HashMap<>();
    +        attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
    +        runner.enqueue(fis, attributes);
    +        runner.run();
    +        fis.close();
    +    }
    +
    +    @Test
    +    public void testBatchSize() throws IOException {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        runner.setProperty(FlumeSinkProcessor.BATCH_SIZE, "1000");
    +        runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
    +            "tier1.sinks.sink-1.batchSize = 1000\n");
    +        for (int i = 0; i < 100000; i++) {
    +          runner.enqueue(String.valueOf(i).getBytes());
    +        }
    +        runner.run();
    +    }
    +    
    +    @Test
    +    public void testHdfsSink() throws IOException {
    +        File destDir = new File("target/hdfs");
    --- End diff --
    
    please get this from junit / maven. For example, by using a TemporaryDirectory JUnit rule.
    
        @Rule
        public final TemporaryFolder temp = new TemporaryFolder();
    
        @Test
        public void someTest() {
            File destDir = temp.newFolder("hdfs");


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r31842097
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml ---
    @@ -0,0 +1,25 @@
    +<?xml version="1.0"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements.  See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License.  You may obtain a copy of the License at
    +      http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    +
    +<!-- Put site-specific property overrides in this file. -->
    +
    +<configuration>
    +    <property>
    +        <name>fs.defaultFS</name>
    +        <value>file:///</value>
    --- End diff --
    
    that is terrible and sounds like a bug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r32168616
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.EventDrivenSource;
    +import org.apache.flume.PollableSource;
    +import org.apache.flume.Source;
    +import org.apache.flume.channel.ChannelProcessor;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.flume.source.EventDrivenSourceRunner;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +/**
    + * This processor runs a Flume source
    + */
    +@Tags({"flume", "hadoop", "get", "source"})
    +@CapabilityDescription("Generate FlowFile data from a Flume source")
    +public class FlumeSourceProcessor extends AbstractFlumeProcessor {
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
    +        .name("Source Type")
    +        .description("The fully-qualified name of the Source class")
    +        .required(true)
    +        .addValidator(createSourceValidator())
    +        .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +        .name("Agent Name")
    +        .description("The name of the agent used in the Flume source configuration")
    +        .required(true)
    +        .defaultValue("tier1")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +        .name("Source Name")
    +        .description("The name of the source used in the Flume source configuration")
    +        .required(true)
    +        .defaultValue("src-1")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +        .name("Flume Configuration")
    +        .description("The Flume configuration for the source copied from the flume.properties file")
    +        .required(true)
    +        .defaultValue("")
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success")
    +        .build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +
    +    private volatile Source source;
    +
    +    private final NifiSessionChannel pollableSourceChannel = new NifiSessionChannel(SUCCESS);
    +    private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = new AtomicReference<>(null);
    +    private final AtomicReference<EventDrivenSourceRunner> runnerRef = new AtomicReference<>(null);
    +    private final AtomicReference<NifiSessionFactoryChannel> eventDrivenSourceChannelRef = new AtomicReference<>(null);
    +    private final AtomicReference<Boolean> stopping = new AtomicReference<>(false);
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
    +        this.relationships = ImmutableSet.of(SUCCESS);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        try {
    +            stopping.set(false);
    +            source = SOURCE_FACTORY.create(
    +                context.getProperty(SOURCE_NAME)
    +                .getValue(),
    +                context.getProperty(SOURCE_TYPE)
    +                .getValue());
    +
    +            String flumeConfig = context.getProperty(FLUME_CONFIG)
    +                .getValue();
    +            String agentName = context.getProperty(AGENT_NAME)
    +                .getValue();
    +            String sourceName = context.getProperty(SOURCE_NAME)
    +                .getValue();
    +            Configurables.configure(source,
    +                getFlumeSourceContext(flumeConfig, agentName, sourceName));
    +
    +            if (source instanceof PollableSource) {
    +                source.setChannelProcessor(new ChannelProcessor(
    +                    new NifiChannelSelector(pollableSourceChannel)));
    +                source.start();
    +            }
    +        } catch (Throwable th) {
    +            getLogger()
    +                .error("Error creating source", th);
    +            throw Throwables.propagate(th);
    +        }
    +    }
    +
    +    @OnUnscheduled
    +    public void unScheduled() {
    +        stopping.set(true);
    +        if (source instanceof PollableSource) {
    +            source.stop();
    +        } else {
    +            EventDrivenSourceRunner runner = runnerRef.get();
    +            if (runner != null) {
    +                runner.stop();
    +                runnerRef.compareAndSet(runner, null);
    +            }
    +
    +            NifiSessionFactoryChannel eventDrivenSourceChannel = eventDrivenSourceChannelRef.get();
    +            if (eventDrivenSourceChannel != null) {
    +                eventDrivenSourceChannel.stop();
    +                eventDrivenSourceChannelRef.compareAndSet(eventDrivenSourceChannel, null);
    +            }
    +        }
    +    }
    +
    +    @OnStopped
    +    public void stopped() {
    +        sessionFactoryRef.set(null);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
    +        if (source instanceof PollableSource) {
    +            super.onTrigger(context, sessionFactory);
    +        } else if (source instanceof EventDrivenSource) {
    +            ProcessSessionFactory old = sessionFactoryRef.getAndSet(sessionFactory);
    +            if (old == null) {
    --- End diff --
    
    Doesn't this need to happen if `old` is ever not the same as `sessionFactory`? It would be good to log or throw an exception because we don't expect that to ever happen, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-110907254
  
    Thanks @markap14!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r32167734
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import java.util.List;
    +import java.util.Set;
    +import org.apache.flume.Context;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.Sink;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +/**
    + * This processor runs a Flume sink
    + */
    +@Tags({"flume", "hadoop", "get", "sink"})
    +@CapabilityDescription("Write FlowFile data to a Flume sink")
    +public class FlumeSinkProcessor extends AbstractFlumeProcessor {
    +
    +    public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
    +            .name("Sink Type")
    +            .description("The fully-qualified name of the Sink class")
    +            .required(true)
    +            .addValidator(createSinkValidator())
    +            .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +            .name("Agent Name")
    +            .description("The name of the agent used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("tier1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +            .name("Sink Name")
    +            .description("The name of the sink used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("sink-1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +            .name("Flume Configuration")
    +            .description("The Flume configuration for the sink copied from the flume.properties file")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    +    public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +
    +    private volatile Sink sink;
    +    private volatile NifiSinkSessionChannel channel;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
    +        this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        try {
    +            channel = new NifiSinkSessionChannel(SUCCESS, FAILURE);
    +            Configurables.configure(channel, new Context());
    --- End diff --
    
    That's true, I can remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-99091760
  
    please update commit message to reference the jira. please squish into a single commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29672432
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java ---
    @@ -0,0 +1,114 @@
    +
    --- End diff --
    
    class needs a license header.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r31842170
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml ---
    @@ -0,0 +1,25 @@
    +<?xml version="1.0"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements.  See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License.  You may obtain a copy of the License at
    +      http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    +
    +<!-- Put site-specific property overrides in this file. -->
    +
    +<configuration>
    +    <property>
    +        <name>fs.defaultFS</name>
    +        <value>file:///</value>
    --- End diff --
    
    ugh. no way to mark concerns "resolved" in github.
    
    so I'll just leave another comment, that I agree the bug in hadoop means there's no sense updating this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-121093612
  
    I meant to upload these yesterday, but I uploaded a couple of templates to the JIRA for testing flows. They're really basic as most of the "fun" flows you can do require that you have a Hadoop cluster up and I didn't want to assume that. If you want some of those for testing, let me know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-109441363
  
    I did a rebase and a force push. @busbey, when you get a chance can you do a quick review to make sure that I caught all of your feedback?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r31857365
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import java.io.File;
    +import static org.junit.Assert.assertEquals;
    +
    +import java.io.FileInputStream;
    +import java.io.FilenameFilter;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import org.apache.commons.io.filefilter.HiddenFileFilter;
    +import org.apache.flume.sink.NullSink;
    +import org.apache.flume.source.AvroSource;
    +
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.util.MockProcessContext;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.apache.nifi.util.file.FileUtils;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class FlumeSinkProcessorTest {
    +
    +  private static final Logger logger =
    +      LoggerFactory.getLogger(FlumeSinkProcessorTest.class);
    +  
    +    @Test
    +    public void testValidators() {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        Collection<ValidationResult> results;
    +        ProcessContext pc;
    +
    +        results = new HashSet<>();
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because Sink Type is required"));
    +        }
    +
    +        // non-existent class
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "invalid.class.name");
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to load sink"));
    +        }
    +
    +        // class doesn't implement Sink
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, AvroSource.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to create sink"));
    +        }
    +
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(0, results.size());
    +    }
    +
    +
    +    @Test
    +    public void testNullSink() throws IOException {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt");
    +        Map<String, String> attributes = new HashMap<>();
    +        attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
    +        runner.enqueue(fis, attributes);
    +        runner.run();
    +        fis.close();
    +    }
    +
    +    @Test
    +    public void testBatchSize() throws IOException {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        runner.setProperty(FlumeSinkProcessor.BATCH_SIZE, "1000");
    +        runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
    +            "tier1.sinks.sink-1.batchSize = 1000\n");
    +        for (int i = 0; i < 100000; i++) {
    +          runner.enqueue(String.valueOf(i).getBytes());
    +        }
    +        runner.run();
    +    }
    +    
    +    @Test
    +    public void testHdfsSink() throws IOException {
    +        File destDir = new File("target/hdfs");
    --- End diff --
    
    I think I can have it both ways. I just pushed a change that sets the `java.io.tmpdir` system property for surefire tests to the build project directory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r32168806
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.EventDrivenSource;
    +import org.apache.flume.PollableSource;
    +import org.apache.flume.Source;
    +import org.apache.flume.channel.ChannelProcessor;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.flume.source.EventDrivenSourceRunner;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +/**
    + * This processor runs a Flume source
    + */
    +@Tags({"flume", "hadoop", "get", "source"})
    +@CapabilityDescription("Generate FlowFile data from a Flume source")
    +public class FlumeSourceProcessor extends AbstractFlumeProcessor {
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
    +        .name("Source Type")
    +        .description("The fully-qualified name of the Source class")
    +        .required(true)
    +        .addValidator(createSourceValidator())
    +        .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +        .name("Agent Name")
    +        .description("The name of the agent used in the Flume source configuration")
    +        .required(true)
    +        .defaultValue("tier1")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +        .name("Source Name")
    +        .description("The name of the source used in the Flume source configuration")
    +        .required(true)
    +        .defaultValue("src-1")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +        .name("Flume Configuration")
    +        .description("The Flume configuration for the source copied from the flume.properties file")
    +        .required(true)
    +        .defaultValue("")
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success")
    +        .build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +
    +    private volatile Source source;
    +
    +    private final NifiSessionChannel pollableSourceChannel = new NifiSessionChannel(SUCCESS);
    +    private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = new AtomicReference<>(null);
    +    private final AtomicReference<EventDrivenSourceRunner> runnerRef = new AtomicReference<>(null);
    +    private final AtomicReference<NifiSessionFactoryChannel> eventDrivenSourceChannelRef = new AtomicReference<>(null);
    +    private final AtomicReference<Boolean> stopping = new AtomicReference<>(false);
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
    +        this.relationships = ImmutableSet.of(SUCCESS);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        try {
    +            stopping.set(false);
    +            source = SOURCE_FACTORY.create(
    +                context.getProperty(SOURCE_NAME)
    +                .getValue(),
    +                context.getProperty(SOURCE_TYPE)
    +                .getValue());
    +
    +            String flumeConfig = context.getProperty(FLUME_CONFIG)
    +                .getValue();
    +            String agentName = context.getProperty(AGENT_NAME)
    +                .getValue();
    +            String sourceName = context.getProperty(SOURCE_NAME)
    +                .getValue();
    +            Configurables.configure(source,
    +                getFlumeSourceContext(flumeConfig, agentName, sourceName));
    +
    +            if (source instanceof PollableSource) {
    +                source.setChannelProcessor(new ChannelProcessor(
    +                    new NifiChannelSelector(pollableSourceChannel)));
    +                source.start();
    --- End diff --
    
    The `PollableSource` will only use the channel on calls to `#process()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r31843541
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import java.io.File;
    +import static org.junit.Assert.assertEquals;
    +
    +import java.io.FileInputStream;
    +import java.io.FilenameFilter;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import org.apache.commons.io.filefilter.HiddenFileFilter;
    +import org.apache.flume.sink.NullSink;
    +import org.apache.flume.source.AvroSource;
    +
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.util.MockProcessContext;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.apache.nifi.util.file.FileUtils;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class FlumeSinkProcessorTest {
    +
    +  private static final Logger logger =
    +      LoggerFactory.getLogger(FlumeSinkProcessorTest.class);
    +  
    +    @Test
    +    public void testValidators() {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        Collection<ValidationResult> results;
    +        ProcessContext pc;
    +
    +        results = new HashSet<>();
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because Sink Type is required"));
    +        }
    +
    +        // non-existent class
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "invalid.class.name");
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to load sink"));
    +        }
    +
    +        // class doesn't implement Sink
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, AvroSource.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to create sink"));
    +        }
    +
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(0, results.size());
    +    }
    +
    +
    +    @Test
    +    public void testNullSink() throws IOException {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt");
    +        Map<String, String> attributes = new HashMap<>();
    +        attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
    +        runner.enqueue(fis, attributes);
    +        runner.run();
    +        fis.close();
    +    }
    +
    +    @Test
    +    public void testBatchSize() throws IOException {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        runner.setProperty(FlumeSinkProcessor.BATCH_SIZE, "1000");
    +        runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
    +            "tier1.sinks.sink-1.batchSize = 1000\n");
    +        for (int i = 0; i < 100000; i++) {
    +          runner.enqueue(String.valueOf(i).getBytes());
    +        }
    +        runner.run();
    +    }
    +    
    +    @Test
    +    public void testHdfsSink() throws IOException {
    +        File destDir = new File("target/hdfs");
    --- End diff --
    
    Yes it works. If you're going to muck with permissions to disallow changes by the creating user (like some HDFS test do), then the deletion attempt will fail and JUnit will not notice. However, in that case maven clean would fail as well.
    
    If you're going to rely on maven clean instead, then you should be getting the location to write to from maven rather than hard-coding 'target' in the working directory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-117358851
  
    I need to look at the threading stuff. The Flume API states that Sink and Source instances are not shared between threads, so we need to maintain that invariant. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r32037195
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java ---
    @@ -0,0 +1,193 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import com.google.common.collect.Lists;
    +import java.util.List;
    +import java.util.Set;
    +import org.apache.flume.Context;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.Sink;
    +import org.apache.flume.Transaction;
    +import org.apache.flume.channel.MemoryChannel;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.jasper.compiler.JspUtil;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.flume.util.FlowFileEvent;
    +
    +/**
    + * This processor runs a Flume sink
    + */
    +@Tags({"flume", "hadoop", "get", "sink"})
    +@CapabilityDescription("Generate FlowFile data from a Flume sink")
    +public class FlumeSinkProcessor extends AbstractFlumeProcessor {
    +
    +    private Sink sink;
    +    private MemoryChannel channel;
    +
    +    public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
    +            .name("Sink Type")
    +            .description("The fully-qualified name of the Sink class")
    +            .required(true)
    +            .addValidator(createSinkValidator())
    +            .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +            .name("Agent Name")
    +            .description("The name of the agent used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("tier1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +            .name("Sink Name")
    +            .description("The name of the sink used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("sink-1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +            .name("Flume Configuration")
    +            .description("The Flume configuration for the sink copied from the flume.properties file")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .description("The number of FlowFiles to process in a single batch")
    +            .required(true)
    +            .defaultValue("100")
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    +    public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +    private int batchSize;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG, BATCH_SIZE);
    +        this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +
    +        try {
    +            channel = new MemoryChannel();
    +            Context memoryChannelContext = new Context();
    +            memoryChannelContext.put("capacity", String.valueOf(batchSize*10));
    +            memoryChannelContext.put("transactionCapacity", String.valueOf(batchSize*10));
    +            Configurables.configure(channel, memoryChannelContext);
    +            channel.start();
    +
    +            sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
    +                    context.getProperty(SINK_TYPE).getValue());
    +            sink.setChannel(channel);
    +
    +            String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
    +            String agentName = context.getProperty(AGENT_NAME).getValue();
    +            String sinkName = context.getProperty(SOURCE_NAME).getValue();
    +            Configurables.configure(sink,
    +                    getFlumeSinkContext(flumeConfig, agentName, sinkName));
    +
    +            sink.start();
    +        } catch (Throwable th) {
    +            getLogger().error("Error creating sink", th);
    +            throw Throwables.propagate(th);
    +        }
    +    }
    +
    +    @OnUnscheduled
    +    public void unScheduled() {
    +        sink.stop();
    +        channel.stop();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context,
    +            final ProcessSession session) throws ProcessException {
    +        List<FlowFile> flowFiles = Lists.newArrayListWithExpectedSize(batchSize);
    +        for (int i = 0; i < batchSize; i++) {
    --- End diff --
    
    Because I didn't see that method :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29672263
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java ---
    @@ -0,0 +1,55 @@
    +
    --- End diff --
    
    this class needs a license header.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-120759891
  
    I rebased and pushed a new commit with most of the review feedback. I'm still working on the extended docs and a template to make testing with a running instance easier. I hope to have those published later today too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r31841067
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import java.io.File;
    +import static org.junit.Assert.assertEquals;
    +
    +import java.io.FileInputStream;
    +import java.io.FilenameFilter;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import org.apache.commons.io.filefilter.HiddenFileFilter;
    +import org.apache.flume.sink.NullSink;
    +import org.apache.flume.source.AvroSource;
    +
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.util.MockProcessContext;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.apache.nifi.util.file.FileUtils;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class FlumeSinkProcessorTest {
    +
    +  private static final Logger logger =
    +      LoggerFactory.getLogger(FlumeSinkProcessorTest.class);
    +  
    +    @Test
    +    public void testValidators() {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        Collection<ValidationResult> results;
    +        ProcessContext pc;
    +
    +        results = new HashSet<>();
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because Sink Type is required"));
    +        }
    +
    +        // non-existent class
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "invalid.class.name");
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to load sink"));
    +        }
    +
    +        // class doesn't implement Sink
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, AvroSource.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to create sink"));
    +        }
    +
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(0, results.size());
    +    }
    +
    +
    +    @Test
    +    public void testNullSink() throws IOException {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt");
    +        Map<String, String> attributes = new HashMap<>();
    +        attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
    +        runner.enqueue(fis, attributes);
    +        runner.run();
    +        fis.close();
    +    }
    +
    +    @Test
    +    public void testBatchSize() throws IOException {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        runner.setProperty(FlumeSinkProcessor.BATCH_SIZE, "1000");
    +        runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
    +            "tier1.sinks.sink-1.batchSize = 1000\n");
    +        for (int i = 0; i < 100000; i++) {
    +          runner.enqueue(String.valueOf(i).getBytes());
    +        }
    +        runner.run();
    +    }
    +    
    +    @Test
    +    public void testHdfsSink() throws IOException {
    +        File destDir = new File("target/hdfs");
    --- End diff --
    
    `TemporaryFolder` doesn't use the maven `target` directory. On my Mac, it uses directories that are created in `/var/folders`. Is there any concern that a `mvn clean` doesn't guarantee that those temporary files are cleaned up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29672615
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/AbstractFlumeTest.java ---
    @@ -0,0 +1,35 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import org.junit.BeforeClass;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class AbstractFlumeTest {
    +
    +    private static Logger logger;
    +
    +    @BeforeClass
    +    public static void setUpClass() {
    --- End diff --
    
    could we move these into a test logger configuration file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-120964542
  
    You're right, it's a tedious process :) I think the Yetus project (being spun out of Hadoop) is planning to take a stab at having programatic tools for rolling up NOTICES to hopefully make this less tedious.
    
    I didn't roll up the NOTICES/LICENSE file to the nifi-assembly. Let me know if you want me to take a stab at that or if y'all want to handle it.
    
    > The bundled dependencies include a rather old version of Jetty. I am not clear if this will conflict with the Jetty libraries we use and a quick scan of our docs suggests we don’t point out the limitations of Nar class loader isolation (will make a ticket). You may have no action here just bringing it to your attention.
    
    I noticed that too. It's the same version that's in the nifi-hadoop-libraries-nar, so I assume it's ok? I also don't think that any of the Flume components will use Jetty the way we're instantiating them. I think that's used by Flume to publish internal metrics data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29692670
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml ---
    @@ -0,0 +1,126 @@
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +    <modelVersion>4.0.0</modelVersion>
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-flume-bundle</artifactId>
    +        <version>0.1.0-incubating-SNAPSHOT</version>
    +    </parent>
    +    <artifactId>nifi-flume-processors</artifactId>
    +    <packaging>jar</packaging>
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-api</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-utils</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-processor-utils</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-flowfile-packager</artifactId>
    +        </dependency>
    +        <dependency> 
    --- End diff --
    
    nit: trailing whitespace


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-110468716
  
    Thanks @joewitt.  I've been working through @rdblue's feedback. Right now, I'm seeing if I can have the event driven sources write directly to the NiFi session rather than through a MemoryChannel.
    
    After that, the last bit of feedback for me to take a look at is if I can simplify the logic of the FlumeSinkProcessor to avoid the MemoryChannel and wrap the Transaction directly. I should have an updated patch today or tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-120966449
  
    NAR documentation fix: https://issues.apache.org/jira/browse/NIFI-763
    
    Tedious licenses: Yeah no worries.  I will roll up it once I merge it in.  Gives me a chance to thoroughly double check stuff anyway.  Thanks.  I feel like we could automate the generation of those project level license/notices anyway (yetus should put a good dent in this though).
    
    thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-120764934
  
    Another way to fix the threading issue is to validate the sinks and sources are safe and wrap them in first-class processors that allow threading even if the general processor doesn't. The MorphlineSolrSink, for example would run just fine and would be a really valuable processor (I talked with @whoschek about it last week).
    
    Joe, do you think that it would be possible to make the serial-only setting an option that gets exposed to the user?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r31858031
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import java.util.List;
    +import java.util.Set;
    +import org.apache.flume.Context;
    +import org.apache.flume.Event;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.EventDrivenSource;
    +import org.apache.flume.PollableSource;
    +import org.apache.flume.Source;
    +import org.apache.flume.SourceRunner;
    +import org.apache.flume.Transaction;
    +import org.apache.flume.channel.ChannelProcessor;
    +import org.apache.flume.channel.MemoryChannel;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.flume.source.EventDrivenSourceRunner;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +/**
    + * This processor runs a Flume source
    + */
    +@Tags({"flume", "hadoop", "get", "source"})
    +@CapabilityDescription("Generate FlowFile data from a Flume source")
    +public class FlumeSourceProcessor extends AbstractFlumeProcessor {
    +
    +    private Source source;
    +    private SourceRunner runner;
    +    private MemoryChannel channel;
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
    +            .name("Source Type")
    +            .description("The fully-qualified name of the Source class")
    +            .required(true)
    +            .addValidator(createSourceValidator())
    +            .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +            .name("Agent Name")
    +            .description("The name of the agent used in the Flume source configuration")
    +            .required(true)
    +            .defaultValue("tier1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +            .name("Source Name")
    +            .description("The name of the source used in the Flume source configuration")
    +            .required(true)
    +            .defaultValue("src-1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +            .name("Flume Configuration")
    +            .description("The Flume configuration for the source copied from the flume.properties file")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
    +        this.relationships = ImmutableSet.of(SUCCESS);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        try {
    +            source = SOURCE_FACTORY.create(
    +                    context.getProperty(SOURCE_NAME).getValue(),
    +                    context.getProperty(SOURCE_TYPE).getValue());
    +
    +            String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
    +            String agentName = context.getProperty(AGENT_NAME).getValue();
    +            String sourceName = context.getProperty(SOURCE_NAME).getValue();
    +            Configurables.configure(source,
    +                    getFlumeSourceContext(flumeConfig, agentName, sourceName));
    +
    +            if (source instanceof EventDrivenSource) {
    --- End diff --
    
    If I understand correctly, the memory channel is used to buffer incoming events between calls to onTrigger because there is no session with which the events would be sent. But this makes all event-driven sources unsafe because the events that arrive between onTrigger calls are stored in memory.
    
    I think you instead need access to a ProcessSessionFactory. It looks like the Processor interface is passed a session factory when onTrigger is called, and [`AbstractProcessor`](https://github.com/apache/incubator-nifi/blob/develop/nifi/nifi-api/src/main/java/org/apache/nifi/processor/AbstractProcessor.java#L24) is what is giving you a session most of the time. I'm not sure if you can get one otherwise or if you can cache the instance, but I think this should be adding content directly to the repository or using a file channel (which can hopefully be avoided) rather than using a mem channel.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r32168929
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import java.util.List;
    +import java.util.Set;
    +import org.apache.flume.Context;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.Sink;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +/**
    + * This processor runs a Flume sink
    + */
    +@Tags({"flume", "hadoop", "get", "sink"})
    +@CapabilityDescription("Write FlowFile data to a Flume sink")
    +public class FlumeSinkProcessor extends AbstractFlumeProcessor {
    +
    +    public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
    +            .name("Sink Type")
    +            .description("The fully-qualified name of the Sink class")
    +            .required(true)
    +            .addValidator(createSinkValidator())
    +            .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +            .name("Agent Name")
    +            .description("The name of the agent used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("tier1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +            .name("Sink Name")
    +            .description("The name of the sink used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("sink-1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +            .name("Flume Configuration")
    +            .description("The Flume configuration for the sink copied from the flume.properties file")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    +    public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +
    +    private volatile Sink sink;
    +    private volatile NifiSinkSessionChannel channel;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
    +        this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        try {
    +            channel = new NifiSinkSessionChannel(SUCCESS, FAILURE);
    +            Configurables.configure(channel, new Context());
    +            channel.start();
    +
    +            sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
    +                    context.getProperty(SINK_TYPE).getValue());
    +            sink.setChannel(channel);
    +
    +            String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
    +            String agentName = context.getProperty(AGENT_NAME).getValue();
    +            String sinkName = context.getProperty(SOURCE_NAME).getValue();
    +            Configurables.configure(sink,
    +                    getFlumeSinkContext(flumeConfig, agentName, sinkName));
    +
    +            sink.start();
    +        } catch (Throwable th) {
    +            getLogger().error("Error creating sink", th);
    +            throw Throwables.propagate(th);
    +        }
    +    }
    +
    +    @OnUnscheduled
    +    public void unScheduled() {
    +        sink.stop();
    +        channel.stop();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context,
    +            final ProcessSession session) throws ProcessException {
    +
    +        channel.setSession(session);
    +        try {
    +            if (sink.process() == Sink.Status.BACKOFF) {
    --- End diff --
    
    It'd be nice if we could expose the counters that Flume uses internally to the NiFi UI. If APIs for that don't exist yet, then there's no reason this feature needs to be blocked by that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r32166926
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import java.util.List;
    +import java.util.Set;
    +import org.apache.flume.Context;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.Sink;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +/**
    + * This processor runs a Flume sink
    + */
    +@Tags({"flume", "hadoop", "get", "sink"})
    +@CapabilityDescription("Write FlowFile data to a Flume sink")
    +public class FlumeSinkProcessor extends AbstractFlumeProcessor {
    +
    +    public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
    +            .name("Sink Type")
    +            .description("The fully-qualified name of the Sink class")
    +            .required(true)
    +            .addValidator(createSinkValidator())
    +            .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +            .name("Agent Name")
    +            .description("The name of the agent used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("tier1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +            .name("Sink Name")
    +            .description("The name of the sink used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("sink-1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +            .name("Flume Configuration")
    +            .description("The Flume configuration for the sink copied from the flume.properties file")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    +    public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +
    +    private volatile Sink sink;
    +    private volatile NifiSinkSessionChannel channel;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
    +        this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        try {
    +            channel = new NifiSinkSessionChannel(SUCCESS, FAILURE);
    +            Configurables.configure(channel, new Context());
    --- End diff --
    
    This probably isn't needed now that you are using a custom channel implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-120785272
  
    Alright, docs pushed. This latest version passes the contrib-check so let me know if you think it needs anything else.
    
    Otherwise, I'll do another rebase and squash the commits into one.
    
    I can also wait to do the rebase and squash until after the 0.2.0 release so that this will be ready to merge after that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-110921016
  
    +1
    
    Looks great!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29687251
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml ---
    @@ -0,0 +1,25 @@
    +<?xml version="1.0"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements.  See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License.  You may obtain a copy of the License at
    +      http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    +
    +<!-- Put site-specific property overrides in this file. -->
    +
    +<configuration>
    +    <property>
    +        <name>fs.defaultFS</name>
    +        <value>file:///</value>
    --- End diff --
    
    could we make this a filtered file so that we can put the defaultFS path into the build directory?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r31855489
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java ---
    @@ -0,0 +1,193 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import com.google.common.collect.Lists;
    +import java.util.List;
    +import java.util.Set;
    +import org.apache.flume.Context;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.Sink;
    +import org.apache.flume.Transaction;
    +import org.apache.flume.channel.MemoryChannel;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.jasper.compiler.JspUtil;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.flume.util.FlowFileEvent;
    +
    +/**
    + * This processor runs a Flume sink
    + */
    +@Tags({"flume", "hadoop", "get", "sink"})
    +@CapabilityDescription("Generate FlowFile data from a Flume sink")
    +public class FlumeSinkProcessor extends AbstractFlumeProcessor {
    +
    +    private Sink sink;
    +    private MemoryChannel channel;
    +
    +    public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
    +            .name("Sink Type")
    +            .description("The fully-qualified name of the Sink class")
    +            .required(true)
    +            .addValidator(createSinkValidator())
    +            .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +            .name("Agent Name")
    +            .description("The name of the agent used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("tier1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +            .name("Sink Name")
    +            .description("The name of the sink used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("sink-1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +            .name("Flume Configuration")
    +            .description("The Flume configuration for the sink copied from the flume.properties file")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .description("The number of FlowFiles to process in a single batch")
    +            .required(true)
    +            .defaultValue("100")
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    +    public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +    private int batchSize;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG, BATCH_SIZE);
    +        this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        batchSize = context.getProperty(BATCH_SIZE).asInteger();
    --- End diff --
    
    Properties initialized in OnScheduled should be volatile.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r32168957
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.EventDrivenSource;
    +import org.apache.flume.PollableSource;
    +import org.apache.flume.Source;
    +import org.apache.flume.channel.ChannelProcessor;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.flume.source.EventDrivenSourceRunner;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +/**
    + * This processor runs a Flume source
    + */
    +@Tags({"flume", "hadoop", "get", "source"})
    +@CapabilityDescription("Generate FlowFile data from a Flume source")
    +public class FlumeSourceProcessor extends AbstractFlumeProcessor {
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
    +        .name("Source Type")
    +        .description("The fully-qualified name of the Source class")
    +        .required(true)
    +        .addValidator(createSourceValidator())
    +        .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +        .name("Agent Name")
    +        .description("The name of the agent used in the Flume source configuration")
    +        .required(true)
    +        .defaultValue("tier1")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +        .name("Source Name")
    +        .description("The name of the source used in the Flume source configuration")
    +        .required(true)
    +        .defaultValue("src-1")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +        .name("Flume Configuration")
    +        .description("The Flume configuration for the source copied from the flume.properties file")
    +        .required(true)
    +        .defaultValue("")
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success")
    +        .build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +
    +    private volatile Source source;
    +
    +    private final NifiSessionChannel pollableSourceChannel = new NifiSessionChannel(SUCCESS);
    +    private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = new AtomicReference<>(null);
    +    private final AtomicReference<EventDrivenSourceRunner> runnerRef = new AtomicReference<>(null);
    +    private final AtomicReference<NifiSessionFactoryChannel> eventDrivenSourceChannelRef = new AtomicReference<>(null);
    +    private final AtomicReference<Boolean> stopping = new AtomicReference<>(false);
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
    +        this.relationships = ImmutableSet.of(SUCCESS);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        try {
    +            stopping.set(false);
    +            source = SOURCE_FACTORY.create(
    +                context.getProperty(SOURCE_NAME)
    +                .getValue(),
    +                context.getProperty(SOURCE_TYPE)
    +                .getValue());
    +
    +            String flumeConfig = context.getProperty(FLUME_CONFIG)
    +                .getValue();
    +            String agentName = context.getProperty(AGENT_NAME)
    +                .getValue();
    +            String sourceName = context.getProperty(SOURCE_NAME)
    +                .getValue();
    +            Configurables.configure(source,
    +                getFlumeSourceContext(flumeConfig, agentName, sourceName));
    +
    +            if (source instanceof PollableSource) {
    +                source.setChannelProcessor(new ChannelProcessor(
    +                    new NifiChannelSelector(pollableSourceChannel)));
    +                source.start();
    --- End diff --
    
    Oops! I thought this was like before where the event driven source had extra initialization. Nevermind.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-116191838
  
    > Is there any service provider construct or similar mechanism whereby we could provide a dropdown of identified sources and sinks
    
    I think this is primarily intended to allow people to use their current Flume config inside of NiFi and pave the way for more seamless integration as a next step. I would say we want to keep this as-is for compatibility (just paste in your current config) and create processors to wrap the widely used Flume components, like the Avro source.
    
    > I think because channel.setSession(…) is being called as it is during the onTrigger method this means the processor needs to run serially
    
    Good point. Maybe we can keep a thread-local cache of the sessions and pass an accessor to the channel instead? Then we would be able to keep parallel execution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r32173669
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import java.util.List;
    +import java.util.Set;
    +import org.apache.flume.Context;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.Sink;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +/**
    + * This processor runs a Flume sink
    + */
    +@Tags({"flume", "hadoop", "get", "sink"})
    +@CapabilityDescription("Write FlowFile data to a Flume sink")
    +public class FlumeSinkProcessor extends AbstractFlumeProcessor {
    +
    +    public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
    +            .name("Sink Type")
    +            .description("The fully-qualified name of the Sink class")
    +            .required(true)
    +            .addValidator(createSinkValidator())
    +            .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +            .name("Agent Name")
    +            .description("The name of the agent used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("tier1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +            .name("Sink Name")
    +            .description("The name of the sink used in the Flume sink configuration")
    +            .required(true)
    +            .defaultValue("sink-1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +            .name("Flume Configuration")
    +            .description("The Flume configuration for the sink copied from the flume.properties file")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    +    public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +
    +    private volatile Sink sink;
    +    private volatile NifiSinkSessionChannel channel;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
    +        this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        try {
    +            channel = new NifiSinkSessionChannel(SUCCESS, FAILURE);
    +            Configurables.configure(channel, new Context());
    +            channel.start();
    +
    +            sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
    +                    context.getProperty(SINK_TYPE).getValue());
    +            sink.setChannel(channel);
    +
    +            String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
    +            String agentName = context.getProperty(AGENT_NAME).getValue();
    +            String sinkName = context.getProperty(SOURCE_NAME).getValue();
    +            Configurables.configure(sink,
    +                    getFlumeSinkContext(flumeConfig, agentName, sinkName));
    +
    +            sink.start();
    +        } catch (Throwable th) {
    +            getLogger().error("Error creating sink", th);
    +            throw Throwables.propagate(th);
    +        }
    +    }
    +
    +    @OnUnscheduled
    +    public void unScheduled() {
    +        sink.stop();
    +        channel.stop();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context,
    +            final ProcessSession session) throws ProcessException {
    +
    +        channel.setSession(session);
    +        try {
    +            if (sink.process() == Sink.Status.BACKOFF) {
    --- End diff --
    
    Agreed, this doesn't need to be blocked. I also just looked at the [implementation in the DatasetSink](https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java#L347) and it doesn't look useful anyway. The counters are about the internal batching, which is pretty much all handled by NiFi anyway. And, the sink has to get its own counter object so it looks really hard to forward that information to NiFi's counters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-120741524
  
    Joey,
    
    Love it and agree with all the points made.
    
    Ryan, Sean, Mark,
    
    Thanks for the great review effort/input on this.
    
    Thanks
    Joe


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-nifi/pull/51


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-109357383
  
    Sorry for the delay, I should have a second iteration of the patch this afternoon PT.
    
    Do you prefer squashing on updates or waiting until the patch is accepted and doing the squash shortly before commit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-110906821
  
    @joey generally @OnStopped is the best place to do something like that. If you want to call onTrigger() multiple times without the @OnScheduled, @OnUnscheduled being called, you can pass in flags to the run() method to specify this. Or alternatively, if it makes sense in your unit test, you can use .run(int) to run some specified number of times.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-120922076
  
    Joey,
    
    I just took another visual pass through it all.  I'm not sure what the emoticon for crying tears of joy is but when I saw the license/notice and documentation included...i was basically needing the emoticon.
    
    Will run through a few test scenarios and pull in the code and get back to you.  This week is a little wonky work wise so unless someone else picks up the ball response might be a little slower than this deserves.
    
    Thanks
    Joe


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29672465
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java ---
    @@ -0,0 +1,25 @@
    +
    --- End diff --
    
    class needs a license header


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-116828986
  
    ryan
    
    ok got ya.  I think i'll need to see it once it is implemented to understand it better.  But so long as you understand the tradespace we should be good
    
    Thanks
    Joe


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r31857255
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import java.util.List;
    +import java.util.Set;
    +import org.apache.flume.Context;
    +import org.apache.flume.Event;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.EventDrivenSource;
    +import org.apache.flume.PollableSource;
    +import org.apache.flume.Source;
    +import org.apache.flume.SourceRunner;
    +import org.apache.flume.Transaction;
    +import org.apache.flume.channel.ChannelProcessor;
    +import org.apache.flume.channel.MemoryChannel;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.flume.source.EventDrivenSourceRunner;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +/**
    + * This processor runs a Flume source
    + */
    +@Tags({"flume", "hadoop", "get", "source"})
    +@CapabilityDescription("Generate FlowFile data from a Flume source")
    +public class FlumeSourceProcessor extends AbstractFlumeProcessor {
    +
    +    private Source source;
    +    private SourceRunner runner;
    +    private MemoryChannel channel;
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
    +            .name("Source Type")
    +            .description("The fully-qualified name of the Source class")
    +            .required(true)
    +            .addValidator(createSourceValidator())
    +            .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +            .name("Agent Name")
    +            .description("The name of the agent used in the Flume source configuration")
    +            .required(true)
    +            .defaultValue("tier1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +            .name("Source Name")
    +            .description("The name of the source used in the Flume source configuration")
    +            .required(true)
    +            .defaultValue("src-1")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +            .name("Flume Configuration")
    +            .description("The Flume configuration for the source copied from the flume.properties file")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
    +        this.relationships = ImmutableSet.of(SUCCESS);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        try {
    +            source = SOURCE_FACTORY.create(
    +                    context.getProperty(SOURCE_NAME).getValue(),
    +                    context.getProperty(SOURCE_TYPE).getValue());
    +
    +            String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
    +            String agentName = context.getProperty(AGENT_NAME).getValue();
    +            String sourceName = context.getProperty(SOURCE_NAME).getValue();
    +            Configurables.configure(source,
    +                    getFlumeSourceContext(flumeConfig, agentName, sourceName));
    +
    +            if (source instanceof EventDrivenSource) {
    +                runner = new EventDrivenSourceRunner();
    +                channel = new MemoryChannel();
    +                Configurables.configure(channel, new Context());
    +                channel.start();
    +                source.setChannelProcessor(new ChannelProcessor(new NifiChannelSelector(channel)));
    +                runner.setSource(source);
    +                runner.start();
    +            }
    +        } catch (Throwable th) {
    +            getLogger().error("Error creating source", th);
    +            throw Throwables.propagate(th);
    +        }
    +    }
    +
    +    @OnUnscheduled
    +    public void unScheduled() {
    +        if (runner != null) {
    +            runner.stop();
    +        }
    +        if (channel != null) {
    +            channel.stop();
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context,
    +            final ProcessSession session) throws ProcessException {
    +        if (source instanceof EventDrivenSource) {
    +            onEventDrivenTrigger(context, session);
    +        } else if (source instanceof PollableSource) {
    +            onPollableTrigger((PollableSource) source, context, session);
    +        }
    +    }
    +
    +    public void onPollableTrigger(final PollableSource pollableSource,
    +            final ProcessContext context, final ProcessSession session)
    +            throws ProcessException {
    +        try {
    +            pollableSource.setChannelProcessor(new ChannelProcessor(
    +                    new NifiChannelSelector(new NifiChannel(session, SUCCESS))));
    +            pollableSource.start();
    +            pollableSource.process();
    +            pollableSource.stop();
    +        } catch (EventDeliveryException ex) {
    +            throw new ProcessException("Error processing pollable source", ex);
    +        }
    +    }
    +
    +    public void onEventDrivenTrigger(final ProcessContext context, final ProcessSession session) {
    +        Transaction transaction = channel.getTransaction();
    +        transaction.begin();
    +
    +        try {
    +            Event event = channel.take();
    --- End diff --
    
    Should this get all of the available events from the channel or at least more than one? The event driven source "provides its own event-driven mechanism to invoke event processing". That sounds like it will produce events on this channel without outside intervention and may have produced many between the time that NiFi triggers the processor. If that's the case, then I think this needs to consume as many as possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-110442669
  
    Joey - i appreciate you don't have a ton of time to do this and that it is by no means a trivial patch.  I am not in love with the convo flow here in terms of following what is being said or outstanding questions.  If there are discussion items you'd like to go over please do feel free to shoot a conversation started to dev@nifi.iao.  If not needed that is cool too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29691933
  
    --- Diff: nifi/pom.xml ---
    @@ -789,6 +789,11 @@
                     <artifactId>nifi-geo-nar</artifactId>
                     <version>0.1.0-incubating-SNAPSHOT</version>
                     <type>nar</type>
    +            <dependency>
    --- End diff --
    
    this change misses closing the geo-nar dependency line, which breaks compilation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-109359481
  
    Do you want all commits squashed or just the incremental commit? I ask because some people prefer to always review the entire patch as is and others prefer to the changes since the last review. I can easily do either. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-109358814
  
    Squash for the review is easier.  But, seriously, we can deal either way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-120764330
  
    +1. Thanks Joey!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29672345
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java ---
    @@ -0,0 +1,40 @@
    +
    --- End diff --
    
    class needs a license header.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r31842715
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import java.io.File;
    +import static org.junit.Assert.assertEquals;
    +
    +import java.io.FileInputStream;
    +import java.io.FilenameFilter;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import org.apache.commons.io.filefilter.HiddenFileFilter;
    +import org.apache.flume.sink.NullSink;
    +import org.apache.flume.source.AvroSource;
    +
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.util.MockProcessContext;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.apache.nifi.util.file.FileUtils;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class FlumeSinkProcessorTest {
    +
    +  private static final Logger logger =
    +      LoggerFactory.getLogger(FlumeSinkProcessorTest.class);
    +  
    +    @Test
    +    public void testValidators() {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        Collection<ValidationResult> results;
    +        ProcessContext pc;
    +
    +        results = new HashSet<>();
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because Sink Type is required"));
    +        }
    +
    +        // non-existent class
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "invalid.class.name");
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to load sink"));
    +        }
    +
    +        // class doesn't implement Sink
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, AvroSource.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to create sink"));
    +        }
    +
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(0, results.size());
    +    }
    +
    +
    +    @Test
    +    public void testNullSink() throws IOException {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt");
    +        Map<String, String> attributes = new HashMap<>();
    +        attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
    +        runner.enqueue(fis, attributes);
    +        runner.run();
    +        fis.close();
    +    }
    +
    +    @Test
    +    public void testBatchSize() throws IOException {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        runner.setProperty(FlumeSinkProcessor.BATCH_SIZE, "1000");
    +        runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
    +            "tier1.sinks.sink-1.batchSize = 1000\n");
    +        for (int i = 0; i < 100000; i++) {
    +          runner.enqueue(String.valueOf(i).getBytes());
    +        }
    +        runner.run();
    +    }
    +    
    +    @Test
    +    public void testHdfsSink() throws IOException {
    +        File destDir = new File("target/hdfs");
    --- End diff --
    
    That's not how I read the javadoc:
    > The TemporaryFolder Rule allows creation of files and folders that should be deleted when the test method finishes (whether it passes or fails). Whether the deletion is successful or not is not checked by this rule. No exception will be thrown in case the deletion fails.
    
    In particular the phrase "Whether the deletion is successful or not is not checked by this rule." worries me. If you're confident this works as expected I'm ok with it. These tests don't write a lot of data to the filesystem but I worry about cases where a test crashes mid run and could potentially leave a large amount of temporary data on the user's system.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-110892315
  
    @rdblue and @joewitt See the latest. I think this addresses all of the Ryan's feedback. I do have one open question:
    
    Currently I'm stopping the Sinks and Sources in an `@OnUnscheduled` callback, but I'm not sure if that's the right place to do it. I'm not sure about the intention of the API generally, but the `TestRunner#run()` method will call `@OnUnscheduled` callbacks immediately after the first call to `Processor#onTrigger()`. That means that effectively only a single `onTrigger()` happens in any of my tests. Should this be delayed until an `@OnStopped` callback?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-116310453
  
    > Good point. Maybe we can keep a thread-local cache of the sessions and pass an accessor to the channel instead? Then we would be able to keep parallel execution.
    
    You'll want to find a mechanism other than thread local.  There basically zero guarantees on which thread you will get between 'onTrigger' calls.  Now if the processor takes a thread and doesn't return it to the controller you can but that is pretty naughty.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r32169514
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import com.google.common.base.Throwables;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +import org.apache.flume.EventDeliveryException;
    +import org.apache.flume.EventDrivenSource;
    +import org.apache.flume.PollableSource;
    +import org.apache.flume.Source;
    +import org.apache.flume.channel.ChannelProcessor;
    +import org.apache.flume.conf.Configurables;
    +import org.apache.flume.source.EventDrivenSourceRunner;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.SchedulingContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +/**
    + * This processor runs a Flume source
    + */
    +@Tags({"flume", "hadoop", "get", "source"})
    +@CapabilityDescription("Generate FlowFile data from a Flume source")
    +public class FlumeSourceProcessor extends AbstractFlumeProcessor {
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
    +        .name("Source Type")
    +        .description("The fully-qualified name of the Source class")
    +        .required(true)
    +        .addValidator(createSourceValidator())
    +        .build();
    +    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
    +        .name("Agent Name")
    +        .description("The name of the agent used in the Flume source configuration")
    +        .required(true)
    +        .defaultValue("tier1")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
    +        .name("Source Name")
    +        .description("The name of the source used in the Flume source configuration")
    +        .required(true)
    +        .defaultValue("src-1")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
    +        .name("Flume Configuration")
    +        .description("The Flume configuration for the source copied from the flume.properties file")
    +        .required(true)
    +        .defaultValue("")
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    public static final Relationship SUCCESS = new Relationship.Builder().name("success")
    +        .build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +    private Set<Relationship> relationships;
    +
    +    private volatile Source source;
    +
    +    private final NifiSessionChannel pollableSourceChannel = new NifiSessionChannel(SUCCESS);
    +    private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = new AtomicReference<>(null);
    +    private final AtomicReference<EventDrivenSourceRunner> runnerRef = new AtomicReference<>(null);
    +    private final AtomicReference<NifiSessionFactoryChannel> eventDrivenSourceChannelRef = new AtomicReference<>(null);
    +    private final AtomicReference<Boolean> stopping = new AtomicReference<>(false);
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
    +        this.relationships = ImmutableSet.of(SUCCESS);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final SchedulingContext context) {
    +        try {
    +            stopping.set(false);
    +            source = SOURCE_FACTORY.create(
    +                context.getProperty(SOURCE_NAME)
    +                .getValue(),
    +                context.getProperty(SOURCE_TYPE)
    +                .getValue());
    +
    +            String flumeConfig = context.getProperty(FLUME_CONFIG)
    +                .getValue();
    +            String agentName = context.getProperty(AGENT_NAME)
    +                .getValue();
    +            String sourceName = context.getProperty(SOURCE_NAME)
    +                .getValue();
    +            Configurables.configure(source,
    +                getFlumeSourceContext(flumeConfig, agentName, sourceName));
    +
    +            if (source instanceof PollableSource) {
    +                source.setChannelProcessor(new ChannelProcessor(
    +                    new NifiChannelSelector(pollableSourceChannel)));
    +                source.start();
    +            }
    +        } catch (Throwable th) {
    +            getLogger()
    +                .error("Error creating source", th);
    +            throw Throwables.propagate(th);
    +        }
    +    }
    +
    +    @OnUnscheduled
    +    public void unScheduled() {
    +        stopping.set(true);
    +        if (source instanceof PollableSource) {
    +            source.stop();
    +        } else {
    +            EventDrivenSourceRunner runner = runnerRef.get();
    +            if (runner != null) {
    +                runner.stop();
    +                runnerRef.compareAndSet(runner, null);
    +            }
    +
    +            NifiSessionFactoryChannel eventDrivenSourceChannel = eventDrivenSourceChannelRef.get();
    +            if (eventDrivenSourceChannel != null) {
    +                eventDrivenSourceChannel.stop();
    +                eventDrivenSourceChannelRef.compareAndSet(eventDrivenSourceChannel, null);
    +            }
    +        }
    +    }
    +
    +    @OnStopped
    +    public void stopped() {
    +        sessionFactoryRef.set(null);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
    +        if (source instanceof PollableSource) {
    +            super.onTrigger(context, sessionFactory);
    +        } else if (source instanceof EventDrivenSource) {
    +            ProcessSessionFactory old = sessionFactoryRef.getAndSet(sessionFactory);
    +            if (old == null) {
    --- End diff --
    
    I based this on how the `ListenUdp` processor is written. I'm happy to throw an exception if it changes out from under us as that should only happen after the processor is stopped and started again and the `@OnStopped` method sets it back to `null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r29692695
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml ---
    @@ -0,0 +1,126 @@
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +    <modelVersion>4.0.0</modelVersion>
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-flume-bundle</artifactId>
    +        <version>0.1.0-incubating-SNAPSHOT</version>
    +    </parent>
    +    <artifactId>nifi-flume-processors</artifactId>
    +    <packaging>jar</packaging>
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-api</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-utils</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-processor-utils</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-flowfile-packager</artifactId>
    +        </dependency>
    +        <dependency> 
    +            <groupId>org.apache.flume</groupId> 
    --- End diff --
    
    nit: trailing whitespace


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-110790585
  
    @rdblue yes, I need to extend `AbstractSessionFactoryProcessor` instead of `AbstractProcessor`. The current problem that I'm seeing is that my source continues to run after the processor is stopped. I'm looking to see if this is a bug in Flume or a problem with how I'm using the Flume API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/51#discussion_r31841291
  
    --- Diff: nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.flume;
    +
    +import java.io.File;
    +import static org.junit.Assert.assertEquals;
    +
    +import java.io.FileInputStream;
    +import java.io.FilenameFilter;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import org.apache.commons.io.filefilter.HiddenFileFilter;
    +import org.apache.flume.sink.NullSink;
    +import org.apache.flume.source.AvroSource;
    +
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.util.MockProcessContext;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.apache.nifi.util.file.FileUtils;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class FlumeSinkProcessorTest {
    +
    +  private static final Logger logger =
    +      LoggerFactory.getLogger(FlumeSinkProcessorTest.class);
    +  
    +    @Test
    +    public void testValidators() {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        Collection<ValidationResult> results;
    +        ProcessContext pc;
    +
    +        results = new HashSet<>();
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because Sink Type is required"));
    +        }
    +
    +        // non-existent class
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "invalid.class.name");
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to load sink"));
    +        }
    +
    +        // class doesn't implement Sink
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, AvroSource.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(1, results.size());
    +        for (ValidationResult vr : results) {
    +            logger.error(vr.toString());
    +            Assert.assertTrue(vr.toString().contains("is invalid because unable to create sink"));
    +        }
    +
    +        results = new HashSet<>();
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        runner.enqueue(new byte[0]);
    +        pc = runner.getProcessContext();
    +        if (pc instanceof MockProcessContext) {
    +            results = ((MockProcessContext) pc).validate();
    +        }
    +        Assert.assertEquals(0, results.size());
    +    }
    +
    +
    +    @Test
    +    public void testNullSink() throws IOException {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt");
    +        Map<String, String> attributes = new HashMap<>();
    +        attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
    +        runner.enqueue(fis, attributes);
    +        runner.run();
    +        fis.close();
    +    }
    +
    +    @Test
    +    public void testBatchSize() throws IOException {
    +        TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
    +        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
    +        runner.setProperty(FlumeSinkProcessor.BATCH_SIZE, "1000");
    +        runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
    +            "tier1.sinks.sink-1.batchSize = 1000\n");
    +        for (int i = 0; i < 100000; i++) {
    +          runner.enqueue(String.valueOf(i).getBytes());
    +        }
    +        runner.run();
    +    }
    +    
    +    @Test
    +    public void testHdfsSink() throws IOException {
    +        File destDir = new File("target/hdfs");
    --- End diff --
    
    mmm...yeah it would be preferable that 'clean' does the job.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-109440352
  
    This new commit addresses all of the review feedback except the README. I didn't do a rebase, so this patch probably doesn't apply as is. I wanted to post this for feedback separately from the rebase which I'll do next.
    
    I'm not sure about the README. We don't want to get into a situation where we're trying to document all of the features provided by Flume. I think if we add something, it should be a pointer to the Flume documentation for the version of Flume that we're using.
    
    Will that be sufficient?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-589: Add processors that can run...

Posted by joey <gi...@git.apache.org>.
Github user joey commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/51#issuecomment-121399156
  
    Thanks @mcgilman!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---