You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by su...@apache.org on 2016/08/19 18:04:43 UTC
incubator-atlas git commit: ATLAS-1129 Remove notification failed
logs on retry and add sleep between retries (svimal2106 via sumasai)
Repository: incubator-atlas
Updated Branches:
refs/heads/master ec94d2ad1 -> f408e93ee
ATLAS-1129 Remove notification failed logs on retry and add sleep between retries (svimal2106 via sumasai)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/f408e93e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/f408e93e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/f408e93e
Branch: refs/heads/master
Commit: f408e93eebfd870f7eef547438c41260f729fdd9
Parents: ec94d2a
Author: Suma Shivaprasad <su...@gmail.com>
Authored: Fri Aug 19 11:02:39 2016 -0700
Committer: Suma Shivaprasad <su...@gmail.com>
Committed: Fri Aug 19 11:02:39 2016 -0700
----------------------------------------------------------------------
distro/src/conf/atlas-application.properties | 3 +++
.../src/main/java/org/apache/atlas/hook/AtlasHook.java | 12 +++++++++++-
.../java/org/apache/atlas/kafka/KafkaNotification.java | 1 -
release-log.txt | 1 +
.../atlas/notification/NotificationHookConsumer.java | 12 +++++++++++-
5 files changed, 26 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f408e93e/distro/src/conf/atlas-application.properties
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties
index 1b2cc81..d334600 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -65,6 +65,9 @@ atlas.kafka.auto.commit.enable=false
atlas.notification.create.topics=true
atlas.notification.replicas=1
atlas.notification.topics=ATLAS_HOOK,ATLAS_ENTITIES
+atlas.notification.log.failed.messages=true
+atlas.notification.consumer.retry.interval=500
+atlas.notification.hook.retry.interval=1000
# Enable for Kerberized Kafka clusters
#atlas.notification.kafka.service.principal=kafka/_HOST@EXAMPLE.COM
#atlas.notification.kafka.keytab.location=/etc/security/keytabs/kafka.service.keytab
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f408e93e/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 09b1c4b..93a10b4 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -54,6 +54,8 @@ public abstract class AtlasHook {
private static boolean logFailedMessages;
private static FailedMessagesLogger failedMessagesLogger;
+ private static int notificationRetryInterval;
+ public static final String ATLAS_NOTIFICATION_RETRY_INTERVAL = "atlas.notification.hook.retry.interval";
public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY =
"atlas.notification.failed.messages.filename";
@@ -76,6 +78,7 @@ public abstract class AtlasHook {
failedMessagesLogger.init();
}
+ notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
Injector injector = Guice.createInjector(new NotificationModule());
notifInterface = injector.getInstance(NotificationInterface.class);
@@ -128,7 +131,14 @@ public abstract class AtlasHook {
} catch (Exception e) {
numRetries++;
if (numRetries < maxRetries) {
- LOG.info("Failed to notify atlas for entity {}. Retrying", message, e);
+ LOG.error("Notification send retry failed");
+ try {
+ LOG.info("Sleeping for {} ms before retry", notificationRetryInterval);
+ Thread.sleep(notificationRetryInterval);
+ } catch (InterruptedException ie){
+ LOG.error("Notification hook thread sleep interrupted");
+ }
+
} else {
if (shouldLogFailedMessages && e instanceof NotificationException) {
List<String> failedMessages = ((NotificationException) e).getFailedMessages();
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f408e93e/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 806f2b4..2309ede 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -241,7 +241,6 @@ public class KafkaNotification extends AbstractNotification implements Service {
LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(),
response.partition(), response.offset());
} catch (Exception e) {
- LOG.warn("Could not send message - {}", context.getMessage(), e);
lastFailureException = e;
failedMessages.add(context.getMessage());
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f408e93e/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index f5e3441..98e61e8 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -10,6 +10,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al
ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai)
ALL CHANGES:
+ATLAS-1129 Remove notification failed logs on retry and add sleep between retries (svimal2106 via sumasai)
ATLAS-1126 Fix NPE in getSchema calls (sumasai)
ATLAS-1125 Enable compression on hbase audit table (shwethags via sumasai)
ATLAS-1121 NPE while submitting topology in StormHook (ayubkhan via sumasai)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f408e93e/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 6b1f3f2..a4fd1c2 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -54,11 +54,13 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize";
+ public static final String CONSUMER_RETRY_INTERVAL="atlas.notification.consumer.retry.interval";
public static final int SERVER_READY_WAIT_TIME_MS = 1000;
private final LocalAtlasClient atlasClient;
private final int maxRetries;
private final int failedMsgCacheSize;
+ private final int consumerRetryInterval;
private NotificationInterface notificationInterface;
private ExecutorService executors;
@@ -74,6 +76,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20);
+ consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
}
@@ -246,7 +249,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
break;
} catch (Throwable e) {
- LOG.warn("Error handling message", e);
+ LOG.warn("Error handling message" + e.getMessage());
+ try{
+ LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
+ Thread.sleep(consumerRetryInterval);
+ }catch (InterruptedException ie){
+ LOG.error("Notification consumer thread sleep interrupted");
+ }
+
if (numRetries == (maxRetries - 1)) {
LOG.warn("Max retries exceeded for message {}", message, e);
failedMessages.add(message);