You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2018/09/18 17:16:43 UTC

atlas git commit: ATLAS-2877: updated notification processing to wait only before retry

Repository: atlas
Updated Branches:
  refs/heads/master 3176d1a1e -> 48e522497


ATLAS-2877: updated notification processing to wait only before retry


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/48e52249
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/48e52249
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/48e52249

Branch: refs/heads/master
Commit: 48e522497f960ac9cc54853c3cb0e9af3e0111f3
Parents: 3176d1a
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Tue Sep 18 09:01:25 2018 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Tue Sep 18 09:10:59 2018 -0700

----------------------------------------------------------------------
 .../notification/NotificationHookConsumer.java  | 31 +++++++++++---------
 1 file changed, 17 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/48e52249/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index d680e4e..9f832b9 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -294,7 +294,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     class HookConsumer extends ShutdownableThread {
         private final NotificationConsumer<HookNotification> consumer;
         private final AtomicBoolean                          shouldRun      = new AtomicBoolean(false);
-        private final List<HookNotification>                 failedMessages = new ArrayList<>();
+        private final List<String>                           failedMessages = new ArrayList<>();
         private final AdaptiveWaiter                         adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
 
         @VisibleForTesting
@@ -523,26 +523,29 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                     } catch (Throwable e) {
                         RequestContext.get().resetEntityGuidUpdates();
 
-                        LOG.warn("Error handling message", e);
-                        try {
-                            LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
-
-                            Thread.sleep(consumerRetryInterval);
-                        } catch (InterruptedException ie) {
-                            LOG.error("Notification consumer thread sleep interrupted");
-                        }
-
                         if (numRetries == (maxRetries - 1)) {
-                            LOG.warn("Max retries exceeded for message {}", message, e);
+                            String strMessage = AbstractNotification.getMessageJson(message);
+
+                            LOG.warn("Max retries exceeded for message {}", strMessage, e);
 
                             isFailedMsg = true;
 
-                            failedMessages.add(message);
+                            failedMessages.add(strMessage);
 
                             if (failedMessages.size() >= failedMsgCacheSize) {
                                 recordFailedMessages();
                             }
                             return;
+                        } else {
+                            LOG.warn("Error handling message", e);
+
+                            try {
+                                LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
+
+                                Thread.sleep(consumerRetryInterval);
+                            } catch (InterruptedException ie) {
+                                LOG.error("Notification consumer thread sleep interrupted");
+                            }
                         }
                     } finally {
                         RequestContext.clear();
@@ -564,8 +567,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
         private void recordFailedMessages() {
             //logging failed messages
-            for (HookNotification message : failedMessages) {
-                FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", AbstractNotification.getMessageJson(message));
+            for (String message : failedMessages) {
+                FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", message);
             }
 
             failedMessages.clear();