You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/07/31 11:37:42 UTC

[GitHub] [nifi] tpalfy commented on a change in pull request #4411: NIFI-6312: Improved connection handling in AMQP processors

tpalfy commented on a change in pull request #4411:
URL: https://github.com/apache/nifi/pull/4411#discussion_r463554505



##########
File path: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
##########
@@ -190,33 +197,50 @@
     public final void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         AMQPResource<T> resource = resourceQueue.poll();
         if (resource == null) {
-            resource = createResource(context);
+            try {
+                resource = createResource(context);
+            } catch (Exception e) {
+                getLogger().error("Failed to initialize AMQP client", e);
+                context.yield();
+                return;
+            }
+        } else if (!resource.isAlive()) {

Review comment:
       With this we put checks in two places, _before_ and _during_ the AMQP communication. And the general problem is, the resource could "die" between the two.
   
   It might be better to incorporate all resource health checks in the _during_ phase.

##########
File path: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
##########
@@ -190,33 +197,50 @@
     public final void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         AMQPResource<T> resource = resourceQueue.poll();
         if (resource == null) {
-            resource = createResource(context);
+            try {
+                resource = createResource(context);
+            } catch (Exception e) {
+                getLogger().error("Failed to initialize AMQP client", e);
+                context.yield();
+                return;
+            }
+        } else if (!resource.isAlive()) {
+            getLogger().error("AMQP client has lost connection while it was waiting in the resource pool, dropping the AMQP client.");
+            closeResource(resource);
+            return;
         }
 
         try {
             processResource(resource.getConnection(), resource.getWorker(), context, session);
-            resourceQueue.offer(resource);
-        } catch (final Exception e) {
-            try {
-                resource.close();
-            } catch (final Exception e2) {
-                e.addSuppressed(e2);
-            }
 
-            throw e;
+            if (!resourceQueue.offer(resource)) {
+                getLogger().info("Worker queue is full, closing AMQP client");
+                closeResource(resource);
+            }
+        } catch (final Exception e) {
+            getLogger().error("Failed to process message, dropping the AMQP client and yielding", e);

Review comment:
       The exception handling has some inconsistencies.
   It would be better to distinguish between the AMQP client failure and other NiFi-related failures.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org