You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by su...@apache.org on 2016/08/19 18:04:43 UTC

incubator-atlas git commit: ATLAS-1129 Remove notification failed logs on retry and add sleep between retries (svimal2106 via sumasai)

Repository: incubator-atlas
Updated Branches:
  refs/heads/master ec94d2ad1 -> f408e93ee


ATLAS-1129 Remove notification failed logs on retry and add sleep between retries (svimal2106 via sumasai)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/f408e93e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/f408e93e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/f408e93e

Branch: refs/heads/master
Commit: f408e93eebfd870f7eef547438c41260f729fdd9
Parents: ec94d2a
Author: Suma Shivaprasad <su...@gmail.com>
Authored: Fri Aug 19 11:02:39 2016 -0700
Committer: Suma Shivaprasad <su...@gmail.com>
Committed: Fri Aug 19 11:02:39 2016 -0700

----------------------------------------------------------------------
 distro/src/conf/atlas-application.properties            |  3 +++
 .../src/main/java/org/apache/atlas/hook/AtlasHook.java  | 12 +++++++++++-
 .../java/org/apache/atlas/kafka/KafkaNotification.java  |  1 -
 release-log.txt                                         |  1 +
 .../atlas/notification/NotificationHookConsumer.java    | 12 +++++++++++-
 5 files changed, 26 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f408e93e/distro/src/conf/atlas-application.properties
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties
index 1b2cc81..d334600 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -65,6 +65,9 @@ atlas.kafka.auto.commit.enable=false
 atlas.notification.create.topics=true
 atlas.notification.replicas=1
 atlas.notification.topics=ATLAS_HOOK,ATLAS_ENTITIES
+atlas.notification.log.failed.messages=true
+atlas.notification.consumer.retry.interval=500
+atlas.notification.hook.retry.interval=1000
 # Enable for Kerberized Kafka clusters
 #atlas.notification.kafka.service.principal=kafka/_HOST@EXAMPLE.COM
 #atlas.notification.kafka.keytab.location=/etc/security/keytabs/kafka.service.keytab

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f408e93e/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 09b1c4b..93a10b4 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -54,6 +54,8 @@ public abstract class AtlasHook {
 
     private static boolean logFailedMessages;
     private static FailedMessagesLogger failedMessagesLogger;
+    private static int notificationRetryInterval;
+    public static final String ATLAS_NOTIFICATION_RETRY_INTERVAL = "atlas.notification.hook.retry.interval";
 
     public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY =
             "atlas.notification.failed.messages.filename";
@@ -76,6 +78,7 @@ public abstract class AtlasHook {
             failedMessagesLogger.init();
         }
 
+        notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
         Injector injector = Guice.createInjector(new NotificationModule());
         notifInterface = injector.getInstance(NotificationInterface.class);
 
@@ -128,7 +131,14 @@ public abstract class AtlasHook {
             } catch (Exception e) {
                 numRetries++;
                 if (numRetries < maxRetries) {
-                    LOG.info("Failed to notify atlas for entity {}. Retrying", message, e);
+                    LOG.error("Notification send retry failed");
+                    try {
+                        LOG.info("Sleeping for {} ms before retry", notificationRetryInterval);
+                        Thread.sleep(notificationRetryInterval);
+                    } catch (InterruptedException ie){
+                        LOG.error("Notification hook thread sleep interrupted");
+                    }
+
                 } else {
                     if (shouldLogFailedMessages && e instanceof NotificationException) {
                         List<String> failedMessages = ((NotificationException) e).getFailedMessages();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f408e93e/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 806f2b4..2309ede 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -241,7 +241,6 @@ public class KafkaNotification extends AbstractNotification implements Service {
                 LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(),
                     response.partition(), response.offset());
             } catch (Exception e) {
-                LOG.warn("Could not send message - {}", context.getMessage(), e);
                 lastFailureException = e;
                 failedMessages.add(context.getMessage());
             }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f408e93e/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index f5e3441..98e61e8 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -10,6 +10,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al
 ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai)
 
 ALL CHANGES:
+ATLAS-1129 Remove notification failed logs on retry and add sleep between retries (svimal2106 via sumasai)
 ATLAS-1126 Fix NPE in getSchema calls (sumasai)
 ATLAS-1125 Enable compression on hbase audit table (shwethags via sumasai)
 ATLAS-1121 NPE while submitting topology in StormHook (ayubkhan via sumasai)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f408e93e/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 6b1f3f2..a4fd1c2 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -54,11 +54,13 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
     public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
     public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize";
+    public static final String CONSUMER_RETRY_INTERVAL="atlas.notification.consumer.retry.interval";
 
     public static final int SERVER_READY_WAIT_TIME_MS = 1000;
     private final LocalAtlasClient atlasClient;
     private final int maxRetries;
     private final int failedMsgCacheSize;
+    private final int consumerRetryInterval;
 
     private NotificationInterface notificationInterface;
     private ExecutorService executors;
@@ -74,6 +76,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
         maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
         failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20);
+        consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
 
     }
 
@@ -246,7 +249,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
                     break;
                 } catch (Throwable e) {
-                    LOG.warn("Error handling message", e);
+                    LOG.warn("Error handling message" + e.getMessage());
+                    try{
+                        LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
+                        Thread.sleep(consumerRetryInterval);
+                    }catch (InterruptedException ie){
+                        LOG.error("Notification consumer thread sleep interrupted");
+                    }
+
                     if (numRetries == (maxRetries - 1)) {
                         LOG.warn("Max retries exceeded for message {}", message, e);
                         failedMessages.add(message);