You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/07/14 23:17:22 UTC

[04/13] incubator-nifi git commit: Added the sink's relationships to the relationship set. Added error checkign and logging for sink/source creation. Fixed an issue with transaction managemetn in the sink. Reformatted per coding standard.

Added the sink's relationships to the relationship set. Added error checkign and logging for sink/source creation. Fixed an issue with transaction managemetn in the sink. Reformatted per coding standard.

Signed-off-by: Matt Gilman <ma...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/cf29029a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/cf29029a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/cf29029a

Branch: refs/heads/develop
Commit: cf29029a4db4cc606b417889296abb97e460d586
Parents: b251ab4
Author: Joey Echeverria <jo...@cloudera.com>
Authored: Fri Jan 30 14:21:21 2015 -0500
Committer: Matt Gilman <ma...@gmail.com>
Committed: Tue Jul 14 14:50:16 2015 -0400

----------------------------------------------------------------------
 nifi/nifi-assembly/pom.xml                      |   3 +
 .../nifi-flume-bundle/nifi-flume-nar/pom.xml    |   5 +
 .../processors/flume/FlumeSinkProcessor.java    | 218 ++++++++--------
 .../processors/flume/FlumeSourceProcessor.java  | 253 ++++++++++---------
 .../flume/FlumeSinkProcessorTest.java           |   2 -
 nifi/pom.xml                                    |   5 +
 6 files changed, 257 insertions(+), 229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cf29029a/nifi/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml
index c679d22..4f4879f 100644
--- a/nifi/nifi-assembly/pom.xml
+++ b/nifi/nifi-assembly/pom.xml
@@ -165,6 +165,9 @@ language governing permissions and limitations under the License. -->
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-kite-nar</artifactId>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-flume-nar</artifactId>
             <type>nar</type>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cf29029a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml
index c5333b6..dff440e 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml
@@ -27,5 +27,10 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-flume-processors</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hadoop-libraries-nar</artifactId>
+            <type>nar</type>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cf29029a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
index 4603d18..fc97ae8 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
@@ -27,6 +27,10 @@ import org.apache.flume.Sink;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
@@ -36,122 +40,122 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.SchedulingContext;
-import org.apache.nifi.processor.annotation.CapabilityDescription;
-import org.apache.nifi.processor.annotation.OnScheduled;
-import org.apache.nifi.processor.annotation.OnUnscheduled;
-import org.apache.nifi.processor.annotation.Tags;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.flume.util.FlowFileEvent;
 
 /**
- * This processor runs a Flume sink 
+ * This processor runs a Flume sink
  */
-@Tags({"flume", "hadoop", "get", "sink" })
+@Tags({"flume", "hadoop", "get", "sink"})
 @CapabilityDescription("Generate FlowFile data from a Flume sink")
 public class FlumeSinkProcessor extends AbstractFlumeProcessor {
 
-  private Sink sink;
-  private MemoryChannel channel;
-
-  public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
-      .name("Sink Type")
-      .description("The fully-qualified name of the Sink class")
-      .required(true)
-      .addValidator(createSinkValidator())
-      .build();
-  public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
-      .name("Agent Name")
-      .description("The name of the agent used in the Flume sink configuration")
-      .required(true)
-      .defaultValue("tier1")
-      .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-      .build();
-  public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
-      .name("Sink Name")
-      .description("The name of the sink used in the Flume sink configuration")
-      .required(true)
-      .defaultValue("sink-1")
-      .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-      .build();
-  public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
-      .name("Flume Configuration")
-      .description("The Flume configuration for the sink copied from the flume.properties file")
-      .required(true)
-      .defaultValue("")
-      .addValidator(Validator.VALID)
-      .build();
-
-  public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
-  public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
-
-  private List<PropertyDescriptor> descriptors;
-  private Set<Relationship> relationships;
-
-
-  @Override
-  protected void init(final ProcessorInitializationContext context) {
-    this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
-    this.relationships = ImmutableSet.of();
-  }
-
-  @Override
-  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-    return descriptors;
-  }
-
-  @Override
-  public Set<Relationship> getRelationships() {
-    return relationships;
-  }
-
-  @OnScheduled
-  public void onScheduled(final SchedulingContext context) {
-    channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
-    channel.start();
-
-    sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
-        context.getProperty(SINK_TYPE).getValue());
-    sink.setChannel(channel);
-
-    String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
-    String agentName = context.getProperty(AGENT_NAME).getValue();
-    String sinkName = context.getProperty(SOURCE_NAME).getValue();
-    Configurables.configure(sink,
-        getFlumeSinkContext(flumeConfig, agentName, sinkName) );
-
-    sink.start();
-  }
-
-  @OnUnscheduled
-  public void unScheduled() {
-    sink.stop();
-    channel.stop();
-  }
-
-  @Override
-  public void onTrigger(final ProcessContext context,
-      final ProcessSession session) throws ProcessException {
-    FlowFile flowFile = session.get();
-
-    Transaction transaction = channel.getTransaction();
-    try {
-      transaction.begin();
-      channel.put(new FlowFileEvent(flowFile, session));
-      transaction.commit();
-    } catch (Throwable th) {
-      transaction.rollback();
-      throw Throwables.propagate(th);
-    } finally {
-      transaction.close();
+    private Sink sink;
+    private MemoryChannel channel;
+
+    public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
+            .name("Sink Type")
+            .description("The fully-qualified name of the Sink class")
+            .required(true)
+            .addValidator(createSinkValidator())
+            .build();
+    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
+            .name("Agent Name")
+            .description("The name of the agent used in the Flume sink configuration")
+            .required(true)
+            .defaultValue("tier1")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
+            .name("Sink Name")
+            .description("The name of the sink used in the Flume sink configuration")
+            .required(true)
+            .defaultValue("sink-1")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
+            .name("Flume Configuration")
+            .description("The Flume configuration for the sink copied from the flume.properties file")
+            .required(true)
+            .defaultValue("")
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
+    public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
+        this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
     }
 
-    try {
-      sink.process();
-      session.transfer(flowFile, SUCCESS);
-    } catch (EventDeliveryException ex) {
-      session.transfer(flowFile, FAILURE);
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
     }
-  }
-}
\ No newline at end of file
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @OnScheduled
+    public void onScheduled(final SchedulingContext context) {
+        try {
+            channel = new MemoryChannel();
+            Configurables.configure(channel, new Context());
+            channel.start();
+
+            sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
+                    context.getProperty(SINK_TYPE).getValue());
+            sink.setChannel(channel);
+
+            String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
+            String agentName = context.getProperty(AGENT_NAME).getValue();
+            String sinkName = context.getProperty(SOURCE_NAME).getValue();
+            Configurables.configure(sink,
+                    getFlumeSinkContext(flumeConfig, agentName, sinkName));
+
+            sink.start();
+        } catch (Throwable th) {
+            getLogger().error("Error creating sink", th);
+            throw Throwables.propagate(th);
+        }
+    }
+
+    @OnUnscheduled
+    public void unScheduled() {
+        sink.stop();
+        channel.stop();
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context,
+            final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+
+        Transaction transaction = channel.getTransaction();
+        try {
+            transaction.begin();
+            channel.put(new FlowFileEvent(flowFile, session));
+            transaction.commit();
+        } catch (Throwable th) {
+            transaction.rollback();
+            throw Throwables.propagate(th);
+        } finally {
+            transaction.close();
+        }
+
+        try {
+            sink.process();
+            session.transfer(flowFile, SUCCESS);
+        } catch (EventDeliveryException ex) {
+            session.transfer(flowFile, FAILURE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cf29029a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
index 8b8388c..19551e6 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.flume;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import java.util.List;
@@ -32,6 +33,10 @@ import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.source.EventDrivenSourceRunner;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
@@ -40,139 +45,147 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.SchedulingContext;
-import org.apache.nifi.processor.annotation.CapabilityDescription;
-import org.apache.nifi.processor.annotation.OnScheduled;
-import org.apache.nifi.processor.annotation.OnUnscheduled;
-import org.apache.nifi.processor.annotation.Tags;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
 /**
- * This processor runs a Flume source 
+ * This processor runs a Flume source
  */
-@Tags({"flume", "hadoop", "get", "source" })
+@Tags({"flume", "hadoop", "get", "source"})
 @CapabilityDescription("Generate FlowFile data from a Flume source")
 public class FlumeSourceProcessor extends AbstractFlumeProcessor {
-  
-  private Source source;
-  private SourceRunner runner;
-  private MemoryChannel channel;
-
-  public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
-      .name("Source Type")
-      .description("The fully-qualified name of the Source class")
-      .required(true)
-      .addValidator(createSourceValidator())
-      .build();
-  public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
-      .name("Agent Name")
-      .description("The name of the agent used in the Flume source configuration")
-      .required(true)
-      .defaultValue("tier1")
-      .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-      .build();
-  public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
-      .name("Source Name")
-      .description("The name of the source used in the Flume source configuration")
-      .required(true)
-      .defaultValue("src-1")
-      .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-      .build();
-  public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
-      .name("Flume Configuration")
-      .description("The Flume configuration for the source copied from the flume.properties file")
-      .required(true)
-      .defaultValue("")
-      .addValidator(Validator.VALID)
-      .build();
-
-  public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
-
-  private List<PropertyDescriptor> descriptors;
-  private Set<Relationship> relationships;
-
-
-  @Override
-  protected void init(final ProcessorInitializationContext context) {
-    this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
-    this.relationships = ImmutableSet.of(SUCCESS);
-  }
-
-  @Override
-  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-    return descriptors;
-  }
-
-  @Override
-  public Set<Relationship> getRelationships() {
-    return relationships;
-  }
-
-  @OnScheduled
-  public void onScheduled(final SchedulingContext context) {
-    source = SOURCE_FACTORY.create(
-        context.getProperty(SOURCE_NAME).getValue(),
-        context.getProperty(SOURCE_TYPE).getValue());
-    
-    String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
-    String agentName = context.getProperty(AGENT_NAME).getValue();
-    String sourceName = context.getProperty(SOURCE_NAME).getValue();
-    Configurables.configure(source,
-        getFlumeSourceContext(flumeConfig, agentName, sourceName) );
-
-    if (source instanceof EventDrivenSource) {
-      runner = new EventDrivenSourceRunner();
-      channel = new MemoryChannel();
-      Configurables.configure(channel, new Context());
-      channel.start();
-      source.setChannelProcessor(new ChannelProcessor(new NifiChannelSelector(channel)));
-      runner.setSource(source);
-      runner.start();
-    } 
-  }
-
-  @OnUnscheduled
-  public void unScheduled() {
-    if (runner != null) {
-      runner.stop();
+
+    private Source source;
+    private SourceRunner runner;
+    private MemoryChannel channel;
+
+    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
+            .name("Source Type")
+            .description("The fully-qualified name of the Source class")
+            .required(true)
+            .addValidator(createSourceValidator())
+            .build();
+    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
+            .name("Agent Name")
+            .description("The name of the agent used in the Flume source configuration")
+            .required(true)
+            .defaultValue("tier1")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
+            .name("Source Name")
+            .description("The name of the source used in the Flume source configuration")
+            .required(true)
+            .defaultValue("src-1")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
+            .name("Flume Configuration")
+            .description("The Flume configuration for the source copied from the flume.properties file")
+            .required(true)
+            .defaultValue("")
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
+        this.relationships = ImmutableSet.of(SUCCESS);
     }
-    if (channel != null) {
-      channel.stop();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
     }
-  }
-
-  @Override
-  public void onTrigger(final ProcessContext context,
-      final ProcessSession session) throws ProcessException {
-    if (source instanceof EventDrivenSource) {
-      onEventDrivenTrigger(context, session);
-    } else if (source instanceof PollableSource) {
-      onPollableTrigger((PollableSource)source, context, session);
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
     }
-  }
-
-  public void onPollableTrigger(final PollableSource pollableSource,
-      final ProcessContext context, final ProcessSession session)
-      throws ProcessException {
-    try {
-      pollableSource.setChannelProcessor(new ChannelProcessor(
-          new NifiChannelSelector(new NifiChannel(session, SUCCESS))));
-      pollableSource.start();
-      pollableSource.process();
-      pollableSource.stop();
-    } catch (EventDeliveryException ex) {
-      throw new ProcessException("Error processing pollable source", ex);
+
+    @OnScheduled
+    public void onScheduled(final SchedulingContext context) {
+        try {
+            source = SOURCE_FACTORY.create(
+                    context.getProperty(SOURCE_NAME).getValue(),
+                    context.getProperty(SOURCE_TYPE).getValue());
+
+            String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
+            String agentName = context.getProperty(AGENT_NAME).getValue();
+            String sourceName = context.getProperty(SOURCE_NAME).getValue();
+            Configurables.configure(source,
+                    getFlumeSourceContext(flumeConfig, agentName, sourceName));
+
+            if (source instanceof EventDrivenSource) {
+                runner = new EventDrivenSourceRunner();
+                channel = new MemoryChannel();
+                Configurables.configure(channel, new Context());
+                channel.start();
+                source.setChannelProcessor(new ChannelProcessor(new NifiChannelSelector(channel)));
+                runner.setSource(source);
+                runner.start();
+            }
+        } catch (Throwable th) {
+            getLogger().error("Error creating source", th);
+            throw Throwables.propagate(th);
+        }
     }
-  }
 
-  public void onEventDrivenTrigger(final ProcessContext context, final ProcessSession session) {
-    Transaction transaction = channel.getTransaction();
-    transaction.begin();
+    @OnUnscheduled
+    public void unScheduled() {
+        if (runner != null) {
+            runner.stop();
+        }
+        if (channel != null) {
+            channel.stop();
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context,
+            final ProcessSession session) throws ProcessException {
+        if (source instanceof EventDrivenSource) {
+            onEventDrivenTrigger(context, session);
+        } else if (source instanceof PollableSource) {
+            onPollableTrigger((PollableSource) source, context, session);
+        }
+    }
+
+    public void onPollableTrigger(final PollableSource pollableSource,
+            final ProcessContext context, final ProcessSession session)
+            throws ProcessException {
+        try {
+            pollableSource.setChannelProcessor(new ChannelProcessor(
+                    new NifiChannelSelector(new NifiChannel(session, SUCCESS))));
+            pollableSource.start();
+            pollableSource.process();
+            pollableSource.stop();
+        } catch (EventDeliveryException ex) {
+            throw new ProcessException("Error processing pollable source", ex);
+        }
+    }
 
-    Event event = channel.take();
-    if (event != null) {
-      transferEvent(event, session, SUCCESS);
+    public void onEventDrivenTrigger(final ProcessContext context, final ProcessSession session) {
+        Transaction transaction = channel.getTransaction();
+        transaction.begin();
+
+        try {
+            Event event = channel.take();
+            if (event != null) {
+                transferEvent(event, session, SUCCESS);
+            }
+            transaction.commit();
+        } catch (Throwable th) {
+            transaction.rollback();
+            throw Throwables.propagate(th);
+        } finally {
+            transaction.close();
+        }
     }
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cf29029a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
index 4f2cef7..8d40cb6 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
@@ -18,7 +18,6 @@ package org.apache.nifi.processors.flume;
 
 import java.io.File;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 import java.io.FileInputStream;
 import java.io.FilenameFilter;
@@ -40,7 +39,6 @@ import org.apache.nifi.util.TestRunners;
 import org.apache.nifi.util.file.FileUtils;
 
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cf29029a/nifi/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/pom.xml b/nifi/pom.xml
index 0c71ba8..422e1aa 100644
--- a/nifi/pom.xml
+++ b/nifi/pom.xml
@@ -805,6 +805,11 @@
                 <artifactId>nifi-geo-nar</artifactId>
                 <version>0.2.0-incubating-SNAPSHOT</version>
                 <type>nar</type>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-flume-nar</artifactId>
+                <version>0.0.1-incubating-SNAPSHOT</version>
+                <type>nar</type>
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>