You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/07/14 23:17:28 UTC
[10/13] incubator-nifi git commit: More review feedback from Ryan:
More review feedback from Ryan:
* Removed unnecessary call to configure our channel
* Removed call to context.yield() when Flume reports a backoff
* Handled the session factory changing when using a event driven source.
Signed-off-by: Matt Gilman <ma...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/16134a2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/16134a2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/16134a2d
Branch: refs/heads/develop
Commit: 16134a2dfedf2f05f7d798082bfd86a5a13bda5a
Parents: c4dd1e6
Author: Joey Echeverria <jo...@gmail.com>
Authored: Wed Jun 10 15:08:03 2015 -0700
Committer: Matt Gilman <ma...@gmail.com>
Committed: Tue Jul 14 14:50:17 2015 -0400
----------------------------------------------------------------------
.../org/apache/nifi/processors/flume/FlumeSinkProcessor.java | 5 +----
.../org/apache/nifi/processors/flume/FlumeSourceProcessor.java | 6 +++++-
2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/16134a2d/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
index 9ec1b07..2d8506d 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
@@ -104,7 +104,6 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor {
public void onScheduled(final SchedulingContext context) {
try {
channel = new NifiSinkSessionChannel(SUCCESS, FAILURE);
- Configurables.configure(channel, new Context());
channel.start();
sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
@@ -135,9 +134,7 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor {
channel.setSession(session);
try {
- if (sink.process() == Sink.Status.BACKOFF) {
- context.yield();
- }
+ sink.process();
} catch (EventDeliveryException ex) {
throw new ProcessException("Flume event delivery failed", ex);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/16134a2d/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
index 1ebf05c..55b1f2f 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
@@ -165,7 +165,11 @@ public class FlumeSourceProcessor extends AbstractFlumeProcessor {
super.onTrigger(context, sessionFactory);
} else if (source instanceof EventDrivenSource) {
ProcessSessionFactory old = sessionFactoryRef.getAndSet(sessionFactory);
- if (old == null) {
+ if (old != sessionFactory) {
+ if (runnerRef.get() != null) {
+ stopped();
+ }
+
runnerRef.set(new EventDrivenSourceRunner());
eventDrivenSourceChannelRef.set(new NifiSessionFactoryChannel(sessionFactoryRef.get(), SUCCESS));
eventDrivenSourceChannelRef.get().start();