You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/07/14 23:17:28 UTC

[10/13] incubator-nifi git commit: More review feedback from Ryan:

More review feedback from Ryan:

* Removed unnecessary call to configure our channel
* Removed call to context.yield() when Flume reports a backoff
* Handled the session factory changing when using a event driven source.

Signed-off-by: Matt Gilman <ma...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/16134a2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/16134a2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/16134a2d

Branch: refs/heads/develop
Commit: 16134a2dfedf2f05f7d798082bfd86a5a13bda5a
Parents: c4dd1e6
Author: Joey Echeverria <jo...@gmail.com>
Authored: Wed Jun 10 15:08:03 2015 -0700
Committer: Matt Gilman <ma...@gmail.com>
Committed: Tue Jul 14 14:50:17 2015 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/processors/flume/FlumeSinkProcessor.java   | 5 +----
 .../org/apache/nifi/processors/flume/FlumeSourceProcessor.java | 6 +++++-
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/16134a2d/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
index 9ec1b07..2d8506d 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
@@ -104,7 +104,6 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor {
     public void onScheduled(final SchedulingContext context) {
         try {
             channel = new NifiSinkSessionChannel(SUCCESS, FAILURE);
-            Configurables.configure(channel, new Context());
             channel.start();
 
             sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
@@ -135,9 +134,7 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor {
 
         channel.setSession(session);
         try {
-            if (sink.process() == Sink.Status.BACKOFF) {
-                context.yield();
-            }
+            sink.process();
         } catch (EventDeliveryException ex) {
             throw new ProcessException("Flume event delivery failed", ex);
         }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/16134a2d/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
index 1ebf05c..55b1f2f 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
@@ -165,7 +165,11 @@ public class FlumeSourceProcessor extends AbstractFlumeProcessor {
             super.onTrigger(context, sessionFactory);
         } else if (source instanceof EventDrivenSource) {
             ProcessSessionFactory old = sessionFactoryRef.getAndSet(sessionFactory);
-            if (old == null) {
+            if (old != sessionFactory) {
+                if (runnerRef.get() != null) {
+                    stopped();
+                }
+
                 runnerRef.set(new EventDrivenSourceRunner());
                 eventDrivenSourceChannelRef.set(new NifiSessionFactoryChannel(sessionFactoryRef.get(), SUCCESS));
                 eventDrivenSourceChannelRef.get().start();