You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2018/12/18 22:00:43 UTC
atlas git commit: ATLAS-2996: Conditionally Prevent Notification
Processing. With support for HA mode.
Repository: atlas
Updated Branches:
refs/heads/master 5307e498a -> bd0c5a8a8
ATLAS-2996: Conditionally Prevent Notification Processing. With support for HA mode.
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/bd0c5a8a
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/bd0c5a8a
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/bd0c5a8a
Branch: refs/heads/master
Commit: bd0c5a8a8895b0ab52222e47beeefaab994a1099
Parents: 5307e49
Author: Ashutosh Mestry <am...@hortonworks.com>
Authored: Tue Dec 18 13:13:49 2018 -0800
Committer: Ashutosh Mestry <am...@hortonworks.com>
Committed: Tue Dec 18 13:13:49 2018 -0800
----------------------------------------------------------------------
.../atlas/notification/NotificationHookConsumer.java | 15 +++++++++++++--
1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/bd0c5a8a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index b955948..b344c50 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -161,8 +161,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@Override
public void start() throws AtlasException {
if (consumerDisabled) {
- LOG.info("Hook consumer stopped. No hook messages will be processed. " +
- "Set property '{}' to false to start consuming hook messages.", CONSUMER_DISABLED);
+ LOG.info("No hook messages will be processed. {} = {}", CONSUMER_DISABLED, consumerDisabled);
return;
}
@@ -205,6 +204,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public void stop() {
//Allow for completion of outstanding work
try {
+ if (consumerDisabled) {
+ return;
+ }
+
stopConsumerThreads();
if (executors != null) {
executors.shutdown();
@@ -244,6 +247,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
*/
@Override
public void instanceIsActive() {
+ if (consumerDisabled) {
+ return;
+ }
+
LOG.info("Reacting to active state: initializing Kafka consumers");
startConsumers(executors);
@@ -257,6 +264,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
*/
@Override
public void instanceIsPassive() {
+ if (consumerDisabled) {
+ return;
+ }
+
LOG.info("Reacting to passive state: shutting down Kafka consumers.");
stop();