You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2018/09/18 17:16:43 UTC
atlas git commit: ATLAS-2877: updated notification processing to wait
only before retry
Repository: atlas
Updated Branches:
refs/heads/master 3176d1a1e -> 48e522497
ATLAS-2877: updated notification processing to wait only before retry
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/48e52249
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/48e52249
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/48e52249
Branch: refs/heads/master
Commit: 48e522497f960ac9cc54853c3cb0e9af3e0111f3
Parents: 3176d1a
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Tue Sep 18 09:01:25 2018 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Tue Sep 18 09:10:59 2018 -0700
----------------------------------------------------------------------
.../notification/NotificationHookConsumer.java | 31 +++++++++++---------
1 file changed, 17 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/48e52249/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index d680e4e..9f832b9 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -294,7 +294,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
class HookConsumer extends ShutdownableThread {
private final NotificationConsumer<HookNotification> consumer;
private final AtomicBoolean shouldRun = new AtomicBoolean(false);
- private final List<HookNotification> failedMessages = new ArrayList<>();
+ private final List<String> failedMessages = new ArrayList<>();
private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
@VisibleForTesting
@@ -523,26 +523,29 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
} catch (Throwable e) {
RequestContext.get().resetEntityGuidUpdates();
- LOG.warn("Error handling message", e);
- try {
- LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
-
- Thread.sleep(consumerRetryInterval);
- } catch (InterruptedException ie) {
- LOG.error("Notification consumer thread sleep interrupted");
- }
-
if (numRetries == (maxRetries - 1)) {
- LOG.warn("Max retries exceeded for message {}", message, e);
+ String strMessage = AbstractNotification.getMessageJson(message);
+
+ LOG.warn("Max retries exceeded for message {}", strMessage, e);
isFailedMsg = true;
- failedMessages.add(message);
+ failedMessages.add(strMessage);
if (failedMessages.size() >= failedMsgCacheSize) {
recordFailedMessages();
}
return;
+ } else {
+ LOG.warn("Error handling message", e);
+
+ try {
+ LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
+
+ Thread.sleep(consumerRetryInterval);
+ } catch (InterruptedException ie) {
+ LOG.error("Notification consumer thread sleep interrupted");
+ }
}
} finally {
RequestContext.clear();
@@ -564,8 +567,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private void recordFailedMessages() {
//logging failed messages
- for (HookNotification message : failedMessages) {
- FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", AbstractNotification.getMessageJson(message));
+ for (String message : failedMessages) {
+ FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", message);
}
failedMessages.clear();