You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/07/14 23:17:22 UTC
[04/13] incubator-nifi git commit: Added the sink's relationships to
the relationship set. Added error checkign and logging for sink/source
creation. Fixed an issue with transaction managemetn in the sink. Reformatted
per coding standard.
Added the sink's relationships to the relationship set. Added error checkign and logging for sink/source creation. Fixed an issue with transaction managemetn in the sink. Reformatted per coding standard.
Signed-off-by: Matt Gilman <ma...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/cf29029a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/cf29029a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/cf29029a
Branch: refs/heads/develop
Commit: cf29029a4db4cc606b417889296abb97e460d586
Parents: b251ab4
Author: Joey Echeverria <jo...@cloudera.com>
Authored: Fri Jan 30 14:21:21 2015 -0500
Committer: Matt Gilman <ma...@gmail.com>
Committed: Tue Jul 14 14:50:16 2015 -0400
----------------------------------------------------------------------
nifi/nifi-assembly/pom.xml | 3 +
.../nifi-flume-bundle/nifi-flume-nar/pom.xml | 5 +
.../processors/flume/FlumeSinkProcessor.java | 218 ++++++++--------
.../processors/flume/FlumeSourceProcessor.java | 253 ++++++++++---------
.../flume/FlumeSinkProcessorTest.java | 2 -
nifi/pom.xml | 5 +
6 files changed, 257 insertions(+), 229 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cf29029a/nifi/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml
index c679d22..4f4879f 100644
--- a/nifi/nifi-assembly/pom.xml
+++ b/nifi/nifi-assembly/pom.xml
@@ -165,6 +165,9 @@ language governing permissions and limitations under the License. -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kite-nar</artifactId>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-flume-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cf29029a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml
index c5333b6..dff440e 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml
@@ -27,5 +27,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flume-processors</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-hadoop-libraries-nar</artifactId>
+ <type>nar</type>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cf29029a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
index 4603d18..fc97ae8 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
@@ -27,6 +27,10 @@ import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
@@ -36,122 +40,122 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SchedulingContext;
-import org.apache.nifi.processor.annotation.CapabilityDescription;
-import org.apache.nifi.processor.annotation.OnScheduled;
-import org.apache.nifi.processor.annotation.OnUnscheduled;
-import org.apache.nifi.processor.annotation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.flume.util.FlowFileEvent;
/**
- * This processor runs a Flume sink
+ * This processor runs a Flume sink
*/
-@Tags({"flume", "hadoop", "get", "sink" })
+@Tags({"flume", "hadoop", "get", "sink"})
@CapabilityDescription("Generate FlowFile data from a Flume sink")
public class FlumeSinkProcessor extends AbstractFlumeProcessor {
- private Sink sink;
- private MemoryChannel channel;
-
- public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
- .name("Sink Type")
- .description("The fully-qualified name of the Sink class")
- .required(true)
- .addValidator(createSinkValidator())
- .build();
- public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
- .name("Agent Name")
- .description("The name of the agent used in the Flume sink configuration")
- .required(true)
- .defaultValue("tier1")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
- .name("Sink Name")
- .description("The name of the sink used in the Flume sink configuration")
- .required(true)
- .defaultValue("sink-1")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
- .name("Flume Configuration")
- .description("The Flume configuration for the sink copied from the flume.properties file")
- .required(true)
- .defaultValue("")
- .addValidator(Validator.VALID)
- .build();
-
- public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
- public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
-
- private List<PropertyDescriptor> descriptors;
- private Set<Relationship> relationships;
-
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
- this.relationships = ImmutableSet.of();
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return descriptors;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return relationships;
- }
-
- @OnScheduled
- public void onScheduled(final SchedulingContext context) {
- channel = new MemoryChannel();
- Configurables.configure(channel, new Context());
- channel.start();
-
- sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
- context.getProperty(SINK_TYPE).getValue());
- sink.setChannel(channel);
-
- String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
- String agentName = context.getProperty(AGENT_NAME).getValue();
- String sinkName = context.getProperty(SOURCE_NAME).getValue();
- Configurables.configure(sink,
- getFlumeSinkContext(flumeConfig, agentName, sinkName) );
-
- sink.start();
- }
-
- @OnUnscheduled
- public void unScheduled() {
- sink.stop();
- channel.stop();
- }
-
- @Override
- public void onTrigger(final ProcessContext context,
- final ProcessSession session) throws ProcessException {
- FlowFile flowFile = session.get();
-
- Transaction transaction = channel.getTransaction();
- try {
- transaction.begin();
- channel.put(new FlowFileEvent(flowFile, session));
- transaction.commit();
- } catch (Throwable th) {
- transaction.rollback();
- throw Throwables.propagate(th);
- } finally {
- transaction.close();
+ private Sink sink;
+ private MemoryChannel channel;
+
+ public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
+ .name("Sink Type")
+ .description("The fully-qualified name of the Sink class")
+ .required(true)
+ .addValidator(createSinkValidator())
+ .build();
+ public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
+ .name("Agent Name")
+ .description("The name of the agent used in the Flume sink configuration")
+ .required(true)
+ .defaultValue("tier1")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
+ .name("Sink Name")
+ .description("The name of the sink used in the Flume sink configuration")
+ .required(true)
+ .defaultValue("sink-1")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
+ .name("Flume Configuration")
+ .description("The Flume configuration for the sink copied from the flume.properties file")
+ .required(true)
+ .defaultValue("")
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
+ public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
+
+ private List<PropertyDescriptor> descriptors;
+ private Set<Relationship> relationships;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
+ this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
}
- try {
- sink.process();
- session.transfer(flowFile, SUCCESS);
- } catch (EventDeliveryException ex) {
- session.transfer(flowFile, FAILURE);
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return descriptors;
}
- }
-}
\ No newline at end of file
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @OnScheduled
+ public void onScheduled(final SchedulingContext context) {
+ try {
+ channel = new MemoryChannel();
+ Configurables.configure(channel, new Context());
+ channel.start();
+
+ sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
+ context.getProperty(SINK_TYPE).getValue());
+ sink.setChannel(channel);
+
+ String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
+ String agentName = context.getProperty(AGENT_NAME).getValue();
+ String sinkName = context.getProperty(SOURCE_NAME).getValue();
+ Configurables.configure(sink,
+ getFlumeSinkContext(flumeConfig, agentName, sinkName));
+
+ sink.start();
+ } catch (Throwable th) {
+ getLogger().error("Error creating sink", th);
+ throw Throwables.propagate(th);
+ }
+ }
+
+ @OnUnscheduled
+ public void unScheduled() {
+ sink.stop();
+ channel.stop();
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context,
+ final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+
+ Transaction transaction = channel.getTransaction();
+ try {
+ transaction.begin();
+ channel.put(new FlowFileEvent(flowFile, session));
+ transaction.commit();
+ } catch (Throwable th) {
+ transaction.rollback();
+ throw Throwables.propagate(th);
+ } finally {
+ transaction.close();
+ }
+
+ try {
+ sink.process();
+ session.transfer(flowFile, SUCCESS);
+ } catch (EventDeliveryException ex) {
+ session.transfer(flowFile, FAILURE);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cf29029a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
index 8b8388c..19551e6 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.flume;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.List;
@@ -32,6 +33,10 @@ import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.source.EventDrivenSourceRunner;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
@@ -40,139 +45,147 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SchedulingContext;
-import org.apache.nifi.processor.annotation.CapabilityDescription;
-import org.apache.nifi.processor.annotation.OnScheduled;
-import org.apache.nifi.processor.annotation.OnUnscheduled;
-import org.apache.nifi.processor.annotation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
/**
- * This processor runs a Flume source
+ * This processor runs a Flume source
*/
-@Tags({"flume", "hadoop", "get", "source" })
+@Tags({"flume", "hadoop", "get", "source"})
@CapabilityDescription("Generate FlowFile data from a Flume source")
public class FlumeSourceProcessor extends AbstractFlumeProcessor {
-
- private Source source;
- private SourceRunner runner;
- private MemoryChannel channel;
-
- public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
- .name("Source Type")
- .description("The fully-qualified name of the Source class")
- .required(true)
- .addValidator(createSourceValidator())
- .build();
- public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
- .name("Agent Name")
- .description("The name of the agent used in the Flume source configuration")
- .required(true)
- .defaultValue("tier1")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
- .name("Source Name")
- .description("The name of the source used in the Flume source configuration")
- .required(true)
- .defaultValue("src-1")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
- .name("Flume Configuration")
- .description("The Flume configuration for the source copied from the flume.properties file")
- .required(true)
- .defaultValue("")
- .addValidator(Validator.VALID)
- .build();
-
- public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
-
- private List<PropertyDescriptor> descriptors;
- private Set<Relationship> relationships;
-
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
- this.relationships = ImmutableSet.of(SUCCESS);
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return descriptors;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return relationships;
- }
-
- @OnScheduled
- public void onScheduled(final SchedulingContext context) {
- source = SOURCE_FACTORY.create(
- context.getProperty(SOURCE_NAME).getValue(),
- context.getProperty(SOURCE_TYPE).getValue());
-
- String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
- String agentName = context.getProperty(AGENT_NAME).getValue();
- String sourceName = context.getProperty(SOURCE_NAME).getValue();
- Configurables.configure(source,
- getFlumeSourceContext(flumeConfig, agentName, sourceName) );
-
- if (source instanceof EventDrivenSource) {
- runner = new EventDrivenSourceRunner();
- channel = new MemoryChannel();
- Configurables.configure(channel, new Context());
- channel.start();
- source.setChannelProcessor(new ChannelProcessor(new NifiChannelSelector(channel)));
- runner.setSource(source);
- runner.start();
- }
- }
-
- @OnUnscheduled
- public void unScheduled() {
- if (runner != null) {
- runner.stop();
+
+ private Source source;
+ private SourceRunner runner;
+ private MemoryChannel channel;
+
+ public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
+ .name("Source Type")
+ .description("The fully-qualified name of the Source class")
+ .required(true)
+ .addValidator(createSourceValidator())
+ .build();
+ public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
+ .name("Agent Name")
+ .description("The name of the agent used in the Flume source configuration")
+ .required(true)
+ .defaultValue("tier1")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
+ .name("Source Name")
+ .description("The name of the source used in the Flume source configuration")
+ .required(true)
+ .defaultValue("src-1")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
+ .name("Flume Configuration")
+ .description("The Flume configuration for the source copied from the flume.properties file")
+ .required(true)
+ .defaultValue("")
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
+
+ private List<PropertyDescriptor> descriptors;
+ private Set<Relationship> relationships;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
+ this.relationships = ImmutableSet.of(SUCCESS);
}
- if (channel != null) {
- channel.stop();
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return descriptors;
}
- }
-
- @Override
- public void onTrigger(final ProcessContext context,
- final ProcessSession session) throws ProcessException {
- if (source instanceof EventDrivenSource) {
- onEventDrivenTrigger(context, session);
- } else if (source instanceof PollableSource) {
- onPollableTrigger((PollableSource)source, context, session);
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
}
- }
-
- public void onPollableTrigger(final PollableSource pollableSource,
- final ProcessContext context, final ProcessSession session)
- throws ProcessException {
- try {
- pollableSource.setChannelProcessor(new ChannelProcessor(
- new NifiChannelSelector(new NifiChannel(session, SUCCESS))));
- pollableSource.start();
- pollableSource.process();
- pollableSource.stop();
- } catch (EventDeliveryException ex) {
- throw new ProcessException("Error processing pollable source", ex);
+
+ @OnScheduled
+ public void onScheduled(final SchedulingContext context) {
+ try {
+ source = SOURCE_FACTORY.create(
+ context.getProperty(SOURCE_NAME).getValue(),
+ context.getProperty(SOURCE_TYPE).getValue());
+
+ String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
+ String agentName = context.getProperty(AGENT_NAME).getValue();
+ String sourceName = context.getProperty(SOURCE_NAME).getValue();
+ Configurables.configure(source,
+ getFlumeSourceContext(flumeConfig, agentName, sourceName));
+
+ if (source instanceof EventDrivenSource) {
+ runner = new EventDrivenSourceRunner();
+ channel = new MemoryChannel();
+ Configurables.configure(channel, new Context());
+ channel.start();
+ source.setChannelProcessor(new ChannelProcessor(new NifiChannelSelector(channel)));
+ runner.setSource(source);
+ runner.start();
+ }
+ } catch (Throwable th) {
+ getLogger().error("Error creating source", th);
+ throw Throwables.propagate(th);
+ }
}
- }
- public void onEventDrivenTrigger(final ProcessContext context, final ProcessSession session) {
- Transaction transaction = channel.getTransaction();
- transaction.begin();
+ @OnUnscheduled
+ public void unScheduled() {
+ if (runner != null) {
+ runner.stop();
+ }
+ if (channel != null) {
+ channel.stop();
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context,
+ final ProcessSession session) throws ProcessException {
+ if (source instanceof EventDrivenSource) {
+ onEventDrivenTrigger(context, session);
+ } else if (source instanceof PollableSource) {
+ onPollableTrigger((PollableSource) source, context, session);
+ }
+ }
+
+ public void onPollableTrigger(final PollableSource pollableSource,
+ final ProcessContext context, final ProcessSession session)
+ throws ProcessException {
+ try {
+ pollableSource.setChannelProcessor(new ChannelProcessor(
+ new NifiChannelSelector(new NifiChannel(session, SUCCESS))));
+ pollableSource.start();
+ pollableSource.process();
+ pollableSource.stop();
+ } catch (EventDeliveryException ex) {
+ throw new ProcessException("Error processing pollable source", ex);
+ }
+ }
- Event event = channel.take();
- if (event != null) {
- transferEvent(event, session, SUCCESS);
+ public void onEventDrivenTrigger(final ProcessContext context, final ProcessSession session) {
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+
+ try {
+ Event event = channel.take();
+ if (event != null) {
+ transferEvent(event, session, SUCCESS);
+ }
+ transaction.commit();
+ } catch (Throwable th) {
+ transaction.rollback();
+ throw Throwables.propagate(th);
+ } finally {
+ transaction.close();
+ }
}
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cf29029a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
index 4f2cef7..8d40cb6 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
@@ -18,7 +18,6 @@ package org.apache.nifi.processors.flume;
import java.io.File;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import java.io.FileInputStream;
import java.io.FilenameFilter;
@@ -40,7 +39,6 @@ import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cf29029a/nifi/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/pom.xml b/nifi/pom.xml
index 0c71ba8..422e1aa 100644
--- a/nifi/pom.xml
+++ b/nifi/pom.xml
@@ -805,6 +805,11 @@
<artifactId>nifi-geo-nar</artifactId>
<version>0.2.0-incubating-SNAPSHOT</version>
<type>nar</type>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-flume-nar</artifactId>
+ <version>0.0.1-incubating-SNAPSHOT</version>
+ <type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>