You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/11/07 22:39:24 UTC

[2/4] atlas git commit: ATLAS-2251: notification module updates (#4)

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 779298a..456a778 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -29,13 +29,14 @@ import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
-import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.notification.NotificationInterface.NotificationType;
 import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityCreateRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityDeleteRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityPartialUpdateRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityUpdateRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityPartialUpdateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
@@ -56,10 +57,7 @@ import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -76,37 +74,37 @@ import static org.apache.atlas.AtlasClientV2.API_V2.UPDATE_ENTITY_BY_ATTRIBUTE;
 @Order(4)
 @DependsOn(value = {"atlasTypeDefStoreInitializer", "atlasTypeDefGraphStoreV1"})
 public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
-    private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
-    private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class);
-    private static final String LOCALHOST = "localhost";
-    private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
+    private static final Logger LOG        = LoggerFactory.getLogger(NotificationHookConsumer.class);
+    private static final Logger PERF_LOG   = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class);
+    private static final Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
 
+    private static final String LOCALHOST         = "localhost";
     private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
 
-    public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
-    public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
+    public static final String CONSUMER_THREADS_PROPERTY         = "atlas.notification.hook.numthreads";
+    public static final String CONSUMER_RETRIES_PROPERTY         = "atlas.notification.hook.maxretries";
     public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize";
-    public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval";
-    public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval";
-    public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval";
-
+    public static final String CONSUMER_RETRY_INTERVAL           = "atlas.notification.consumer.retry.interval";
+    public static final String CONSUMER_MIN_RETRY_INTERVAL       = "atlas.notification.consumer.min.retry.interval";
+    public static final String CONSUMER_MAX_RETRY_INTERVAL       = "atlas.notification.consumer.max.retry.interval";
 
     public static final int SERVER_READY_WAIT_TIME_MS = 1000;
-    private final AtlasEntityStore atlasEntityStore;
-    private final ServiceState serviceState;
+
+    private final AtlasEntityStore       atlasEntityStore;
+    private final ServiceState           serviceState;
     private final AtlasInstanceConverter instanceConverter;
-    private final AtlasTypeRegistry typeRegistry;
-    private final int maxRetries;
-    private final int failedMsgCacheSize;
+    private final AtlasTypeRegistry      typeRegistry;
+    private final int                    maxRetries;
+    private final int                    failedMsgCacheSize;
+    private final int                    minWaitDuration;
+    private final int                    maxWaitDuration;
+
+    private NotificationInterface notificationInterface;
+    private ExecutorService       executors;
+    private Configuration         applicationProperties;
 
     @VisibleForTesting
     final int consumerRetryInterval;
-    private final int minWaitDuration;
-    private final int maxWaitDuration;
-
-    private NotificationInterface notificationInterface;
-    private ExecutorService executors;
-    private Configuration applicationProperties;
 
     @VisibleForTesting
     List<HookConsumer> consumers;
@@ -116,18 +114,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                                     ServiceState serviceState, AtlasInstanceConverter instanceConverter,
                                     AtlasTypeRegistry typeRegistry) throws AtlasException {
         this.notificationInterface = notificationInterface;
-        this.atlasEntityStore = atlasEntityStore;
-        this.serviceState = serviceState;
-        this.instanceConverter = instanceConverter;
-        this.typeRegistry = typeRegistry;
-
+        this.atlasEntityStore      = atlasEntityStore;
+        this.serviceState          = serviceState;
+        this.instanceConverter     = instanceConverter;
+        this.typeRegistry          = typeRegistry;
         this.applicationProperties = ApplicationProperties.get();
 
-        maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
-        failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20);
+        maxRetries            = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
+        failedMsgCacheSize    = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20);
         consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
-        minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms  by default
-        maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60);  //  30 sec by default
+        minWaitDuration       = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms  by default
+        maxWaitDuration       = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60);  //  30 sec by default
     }
 
     @Override
@@ -144,21 +141,24 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         }
         if (!HAConfiguration.isHAEnabled(configuration)) {
             LOG.info("HA is disabled, starting consumers inline.");
+
             startConsumers(executorService);
         }
     }
 
     private void startConsumers(ExecutorService executorService) {
-        int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
-        List<NotificationConsumer<HookNotificationMessage>> notificationConsumers =
-                notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
+        int                                          numThreads            = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
+        List<NotificationConsumer<HookNotification>> notificationConsumers = notificationInterface.createConsumers(NotificationType.HOOK, numThreads);
+
         if (executorService == null) {
-            executorService = Executors.newFixedThreadPool(notificationConsumers.size(),
-                    new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
+            executorService = Executors.newFixedThreadPool(notificationConsumers.size(), new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
         }
+
         executors = executorService;
-        for (final NotificationConsumer<HookNotificationMessage> consumer : notificationConsumers) {
+
+        for (final NotificationConsumer<HookNotification> consumer : notificationConsumers) {
             HookConsumer hookConsumer = new HookConsumer(consumer);
+
             consumers.add(hookConsumer);
             executors.submit(hookConsumer);
         }
@@ -171,11 +171,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             stopConsumerThreads();
             if (executors != null) {
                 executors.shutdown();
+
                 if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                     LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
                 }
+
                 executors = null;
             }
+
             notificationInterface.close();
         } catch (InterruptedException e) {
             LOG.error("Failure in shutting down consumers");
@@ -189,6 +192,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             for (HookConsumer consumer : consumers) {
                 consumer.shutdown();
             }
+
             consumers.clear();
         }
 
@@ -204,6 +208,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     @Override
     public void instanceIsActive() {
         LOG.info("Reacting to active state: initializing Kafka consumers");
+
         startConsumers(executors);
     }
 
@@ -216,6 +221,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     @Override
     public void instanceIsPassive() {
         LOG.info("Reacting to passive state: shutting down Kafka consumers.");
+
         stop();
     }
 
@@ -235,18 +241,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         private final long maxDuration;
         private final long minDuration;
         private final long resetInterval;
+        private       long lastWaitAt;
 
-        private long lastWaitAt;
         @VisibleForTesting
         long waitDuration;
 
         public AdaptiveWaiter(long minDuration, long maxDuration, long increment) {
-            this.minDuration = minDuration;
-            this.maxDuration = maxDuration;
-            this.increment = increment;
-
-            this.waitDuration = minDuration;
-            this.lastWaitAt = 0;
+            this.minDuration   = minDuration;
+            this.maxDuration   = maxDuration;
+            this.increment     = increment;
+            this.waitDuration  = minDuration;
+            this.lastWaitAt    = 0;
             this.resetInterval = maxDuration * 2;
         }
 
@@ -268,7 +273,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
         private void setWaitDurations() {
             long timeSinceLastWait = (lastWaitAt == 0) ? 0 : System.currentTimeMillis() - lastWaitAt;
+
             lastWaitAt = System.currentTimeMillis();
+
             if (timeSinceLastWait > resetInterval) {
                 waitDuration = minDuration;
             } else {
@@ -282,14 +289,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
     @VisibleForTesting
     class HookConsumer extends ShutdownableThread {
-        private final NotificationConsumer<HookNotificationMessage> consumer;
-        private final AtomicBoolean shouldRun = new AtomicBoolean(false);
-        private List<HookNotificationMessage> failedMessages = new ArrayList<>();
-
-        private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
+        private final NotificationConsumer<HookNotification> consumer;
+        private final AtomicBoolean                          shouldRun      = new AtomicBoolean(false);
+        private final List<HookNotification>                 failedMessages = new ArrayList<>();
+        private final AdaptiveWaiter                         adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
 
-        public HookConsumer(NotificationConsumer<HookNotificationMessage> consumer) {
+        public HookConsumer(NotificationConsumer<HookNotification> consumer) {
             super("atlas-hook-consumer-thread", false);
+
             this.consumer = consumer;
         }
 
@@ -306,8 +313,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             try {
                 while (shouldRun.get()) {
                     try {
-                        List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive();
-                        for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
+                        List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive();
+
+                        for (AtlasKafkaMessage<HookNotification> msg : messages) {
                             handleMessage(msg);
                         }
                     } catch (IllegalStateException ex) {
@@ -315,6 +323,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                     } catch (Exception e) {
                         if (shouldRun.get()) {
                             LOG.warn("Exception in NotificationHookConsumer", e);
+
                             adaptiveWaiter.pause(e);
                         } else {
                             break;
@@ -324,6 +333,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             } finally {
                 if (consumer != null) {
                     LOG.info("closing NotificationConsumer");
+
                     consumer.close();
                 }
 
@@ -332,11 +342,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         }
 
         @VisibleForTesting
-        void handleMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) throws AtlasServiceException, AtlasException {
-            AtlasPerfTracer perf = null;
-
-            HookNotificationMessage message = kafkaMsg.getMessage();
-            String messageUser = message.getUser();
+        void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) throws AtlasServiceException, AtlasException {
+            AtlasPerfTracer  perf        = null;
+            HookNotification message     = kafkaMsg.getMessage();
+            String           messageUser = message.getUser();
 
             if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
                 perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name());
@@ -344,21 +353,25 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
             try {
                 // Used for intermediate conversions during create and update
-                AtlasEntity.AtlasEntitiesWithExtInfo entities = null;
+                AtlasEntitiesWithExtInfo entities = null;
+
                 for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries);
                     }
+
                     try {
                         RequestContextV1 requestContext = RequestContextV1.get();
+
                         requestContext.setUser(messageUser);
 
                         switch (message.getType()) {
                             case ENTITY_CREATE:
-                                EntityCreateRequest createRequest = (EntityCreateRequest) message;
+                                final EntityCreateRequest createRequest = (EntityCreateRequest) message;
 
                                 if (numRetries == 0) { // audit only on the first attempt
                                     AtlasBaseClient.API api = AtlasClient.API_V1.CREATE_ENTITY;
+
                                     audit(messageUser, api.getMethod(), api.getNormalizedPath());
                                 }
 
@@ -372,19 +385,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
                                 if (numRetries == 0) { // audit only on the first attempt
                                     AtlasBaseClient.API api = UPDATE_ENTITY_BY_ATTRIBUTE;
-                                    audit(messageUser, api.getMethod(),
-                                          String.format(api.getNormalizedPath(), partialUpdateRequest.getTypeName()));
+
+                                    audit(messageUser, api.getMethod(), String.format(api.getNormalizedPath(), partialUpdateRequest.getTypeName()));
                                 }
 
                                 Referenceable referenceable = partialUpdateRequest.getEntity();
+
                                 entities = instanceConverter.toAtlasEntity(referenceable);
 
                                 AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
-                                String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>() {
-                                    {
-                                        put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue());
-                                    }
-                                });
+                                String          guid       = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, Collections.singletonMap(partialUpdateRequest.getAttribute(), (Object)partialUpdateRequest.getAttributeValue()));
 
                                 // There should only be one root entity
                                 entities.getEntities().get(0).setGuid(guid);
@@ -397,30 +407,30 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
                                 if (numRetries == 0) { // audit only on the first attempt
                                     AtlasBaseClient.API api = DELETE_ENTITY_BY_ATTRIBUTE;
-                                    audit(messageUser, api.getMethod(),
-                                          String.format(api.getNormalizedPath(), deleteRequest.getTypeName()));
+
+                                    audit(messageUser, api.getMethod(), String.format(api.getNormalizedPath(), deleteRequest.getTypeName()));
                                 }
 
                                 try {
                                     AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
-                                    atlasEntityStore.deleteByUniqueAttributes(type,
-                                            new HashMap<String, Object>() {{
-                                                put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue());
-                                            }});
+
+                                    atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue()));
                                 } catch (ClassCastException cle) {
                                     LOG.error("Failed to do a partial update on Entity");
                                 }
                                 break;
 
                             case ENTITY_FULL_UPDATE:
-                                EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
+                                final EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
 
                                 if (numRetries == 0) { // audit only on the first attempt
                                     AtlasBaseClient.API api = UPDATE_ENTITY;
+
                                     audit(messageUser, api.getMethod(), api.getNormalizedPath());
                                 }
 
                                 entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
+
                                 atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
                                 break;
 
@@ -433,6 +443,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                         LOG.warn("Error handling message", e);
                         try {
                             LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
+
                             Thread.sleep(consumerRetryInterval);
                         } catch (InterruptedException ie) {
                             LOG.error("Notification consumer thread sleep interrupted");
@@ -440,7 +451,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
                         if (numRetries == (maxRetries - 1)) {
                             LOG.warn("Max retries exceeded for message {}", message, e);
+
                             failedMessages.add(message);
+
                             if (failedMessages.size() >= failedMsgCacheSize) {
                                 recordFailedMessages();
                             }
@@ -458,15 +471,18 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
         private void recordFailedMessages() {
             //logging failed messages
-            for (HookNotificationMessage message : failedMessages) {
+            for (HookNotification message : failedMessages) {
                 FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", AbstractNotification.getMessageJson(message));
             }
+
             failedMessages.clear();
         }
 
-        private void commit(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage) {
+        private void commit(AtlasKafkaMessage<HookNotification> kafkaMessage) {
             recordFailedMessages();
+
             TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition());
+
             consumer.commit(partition, kafkaMessage.getOffset() + 1);
         }
 
@@ -474,22 +490,23 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             try {
                 while (serviceState.getState() != ServiceState.ServiceStateValue.ACTIVE) {
                     try {
-                        LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...",
-                                SERVER_READY_WAIT_TIME_MS);
+                        LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...", SERVER_READY_WAIT_TIME_MS);
+
                         timer.sleep(SERVER_READY_WAIT_TIME_MS);
                     } catch (InterruptedException e) {
-                        LOG.info("Interrupted while waiting for Atlas Server to become ready, "
-                                + "exiting consumer thread.", e);
+                        LOG.info("Interrupted while waiting for Atlas Server to become ready, " + "exiting consumer thread.", e);
+
                         return false;
                     }
                 }
             } catch (Throwable e) {
-                LOG.info(
-                        "Handled AtlasServiceException while waiting for Atlas Server to become ready, "
-                                + "exiting consumer thread.", e);
+                LOG.info("Handled AtlasServiceException while waiting for Atlas Server to become ready, exiting consumer thread.", e);
+
                 return false;
             }
+
             LOG.info("Atlas Server is ready, can start reading Kafka events.");
+
             return true;
         }
 
@@ -504,12 +521,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             }
 
             super.initiateShutdown();
+
             shouldRun.set(false);
+
             if (consumer != null) {
                 consumer.wakeup();
             }
 
             super.awaitShutdown();
+
             LOG.info("<== HookConsumer shutdown()");
         }
     }
@@ -519,7 +539,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             LOG.debug("==> audit({},{}, {})", messageUser, method, path);
         }
 
-        AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST,
-                DateTimeHelper.formatDateUTC(new Date()));
+        AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST, DateTimeHelper.formatDateUTC(new Date()));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
index 517d25f..5baafeb 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
@@ -21,11 +21,13 @@ package org.apache.atlas.notification;
 import com.google.common.collect.ImmutableSet;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.kafka.NotificationProvider;
+import org.apache.atlas.notification.NotificationInterface.NotificationType;
 import org.apache.atlas.v1.model.instance.Id;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.v1.model.instance.Struct;
-import org.apache.atlas.v1.model.typedef.TraitTypeDefinition;
-import org.apache.atlas.v1.model.notification.EntityNotification;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1.OperationType;
+import org.apache.atlas.v1.model.typedef.*;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.v1.typesystem.types.utils.TypesUtil;
 import org.apache.atlas.web.integration.BaseResourceIT;
@@ -42,33 +44,35 @@ import static org.testng.Assert.assertTrue;
  * Entity Notification Integration Tests.
  */
 public class EntityNotificationIT extends BaseResourceIT {
-
-    private final String DATABASE_NAME = "db" + randomString();
-    private final String TABLE_NAME = "table" + randomString();
-    private NotificationInterface notificationInterface = NotificationProvider.get();
-    private Id tableId;
-    private Id dbId;
-    private String traitName;
-    private NotificationConsumer notificationConsumer;
+    private final String                DATABASE_NAME         = "db" + randomString();
+    private final String                TABLE_NAME            = "table" + randomString();
+    private final NotificationInterface notificationInterface = NotificationProvider.get();
+    private       Id                    tableId;
+    private       Id                    dbId;
+    private       String                traitName;
+    private       NotificationConsumer  notificationConsumer;
 
     @BeforeClass
     public void setUp() throws Exception {
         super.setUp();
+
         createTypeDefinitionsV1();
+
         Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(DATABASE_NAME);
+
         dbId = createInstance(HiveDBInstance);
 
-        notificationConsumer = notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1).get(0);
+        notificationConsumer = notificationInterface.createConsumers(NotificationType.ENTITIES, 1).get(0);
     }
 
     public void testCreateEntity() throws Exception {
         Referenceable tableInstance = createHiveTableInstanceBuiltIn(DATABASE_NAME, TABLE_NAME, dbId);
+
         tableId = createInstance(tableInstance);
 
         final String guid = tableId._getId();
 
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME,
-                newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid));
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid));
     }
 
     public void testUpdateEntity() throws Exception {
@@ -79,83 +83,83 @@ public class EntityNotificationIT extends BaseResourceIT {
 
         atlasClientV1.updateEntityAttribute(guid, property, newValue);
 
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME,
-                newNotificationPredicate(EntityNotification.OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE_BUILTIN, guid));
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE_BUILTIN, guid));
     }
 
     public void testDeleteEntity() throws Exception {
-        final String tableName = "table-" + randomString();
-        final String dbName = "db-" + randomString();
-        Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(dbName);
-        Id dbId = createInstance(HiveDBInstance);
+        final String        tableName      = "table-" + randomString();
+        final String        dbName         = "db-" + randomString();
+        final Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(dbName);
+        final Id            dbId           = createInstance(HiveDBInstance);
+        final Referenceable tableInstance  = createHiveTableInstanceBuiltIn(dbName, tableName, dbId);
+        final Id            tableId        = createInstance(tableInstance);
+        final String        guid           = tableId._getId();
 
-        Referenceable tableInstance = createHiveTableInstanceBuiltIn(dbName, tableName, dbId);
-        final Id tableId = createInstance(tableInstance);
-        final String guid = tableId._getId();
-
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME,
-            newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid));
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid));
 
         final String name = (String) tableInstance.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
 
         atlasClientV1.deleteEntity(HIVE_TABLE_TYPE_BUILTIN, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
 
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME,
-            newNotificationPredicate(EntityNotification.OperationType.ENTITY_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid));
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid));
     }
 
     public void testAddTrait() throws Exception {
         String superSuperTraitName = "SuperTrait" + randomString();
-        createTrait(superSuperTraitName);
-
-        String superTraitName = "SuperTrait" + randomString();
-        createTrait(superTraitName, superSuperTraitName);
+        String superTraitName      = "SuperTrait" + randomString();
 
         traitName = "Trait" + randomString();
+
+        createTrait(superSuperTraitName);
+        createTrait(superTraitName, superSuperTraitName);
         createTrait(traitName, superTraitName);
 
-        Struct traitInstance = new Struct(traitName);
+        Struct traitInstance     = new Struct(traitName);
         String traitInstanceJSON = AtlasType.toV1Json(traitInstance);
+
         LOG.debug("Trait instance = {}", traitInstanceJSON);
 
         final String guid = tableId._getId();
 
         atlasClientV1.addTrait(guid, traitInstance);
 
-        EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
-                newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid));
+        EntityNotificationV1 entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid));
 
         Referenceable entity = entityNotification.getEntity();
+
         assertTrue(entity.getTraitNames().contains(traitName));
 
-        List<Struct> allTraits = entityNotification.getAllTraits();
+        List<Struct> allTraits     = entityNotification.getAllTraits();
         List<String> allTraitNames = new LinkedList<>();
 
         for (Struct struct : allTraits) {
             allTraitNames.add(struct.getTypeName());
         }
+
         assertTrue(allTraitNames.contains(traitName));
         assertTrue(allTraitNames.contains(superTraitName));
         assertTrue(allTraitNames.contains(superSuperTraitName));
 
         String anotherTraitName = "Trait" + randomString();
+
         createTrait(anotherTraitName, superTraitName);
 
-        traitInstance = new Struct(anotherTraitName);
+        traitInstance     = new Struct(anotherTraitName);
         traitInstanceJSON = AtlasType.toV1Json(traitInstance);
+
         LOG.debug("Trait instance = {}", traitInstanceJSON);
 
         atlasClientV1.addTrait(guid, traitInstance);
 
-        entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
-                newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid));
+        entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid));
 
-        allTraits = entityNotification.getAllTraits();
+        allTraits     = entityNotification.getAllTraits();
         allTraitNames = new LinkedList<>();
 
         for (Struct struct : allTraits) {
             allTraitNames.add(struct.getTypeName());
         }
+
         assertTrue(allTraitNames.contains(traitName));
         assertTrue(allTraitNames.contains(anotherTraitName));
         // verify that the super type shows up twice in all traits
@@ -167,8 +171,8 @@ public class EntityNotificationIT extends BaseResourceIT {
 
         atlasClientV1.deleteTrait(guid, traitName);
 
-        EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
-                newNotificationPredicate(EntityNotification.OperationType.TRAIT_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid));
+        EntityNotificationV1 entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
+                newNotificationPredicate(EntityNotificationV1.OperationType.TRAIT_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid));
 
         assertFalse(entityNotification.getEntity().getTraitNames().contains(traitName));
     }
@@ -177,11 +181,15 @@ public class EntityNotificationIT extends BaseResourceIT {
     // ----- helper methods ---------------------------------------------------
 
     private void createTrait(String traitName, String ... superTraitNames) throws Exception {
-        TraitTypeDefinition trait =
-            TypesUtil.createTraitTypeDef(traitName, null, ImmutableSet.copyOf(superTraitNames));
+        TraitTypeDefinition traitDef = TypesUtil.createTraitTypeDef(traitName, null, ImmutableSet.copyOf(superTraitNames));
+        TypesDef            typesDef = new TypesDef(Collections.<EnumTypeDefinition>emptyList(),
+                                                    Collections.<StructTypeDefinition>emptyList(),
+                                                    Collections.singletonList(traitDef),
+                                                    Collections.<ClassTypeDefinition>emptyList());
+        String traitDefinitionJSON = AtlasType.toV1Json(typesDef);
 
-        String traitDefinitionJSON = AtlasType.toV1Json(trait);
         LOG.debug("Trait definition = {}", traitDefinitionJSON);
+
         createType(traitDefinitionJSON);
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
index 1f045e4..f248593 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
@@ -20,14 +20,13 @@ package org.apache.atlas.notification;
 
 import org.apache.atlas.EntityAuditEvent;
 import org.apache.atlas.kafka.NotificationProvider;
+import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.v1.model.instance.Id;
 import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.notification.HookNotification;
-import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityDeleteRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityPartialUpdateRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityCreateRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityUpdateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityPartialUpdateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest;
 import org.apache.atlas.web.integration.BaseResourceIT;
 import org.codehaus.jettison.json.JSONArray;
 import org.testng.annotations.AfterClass;
@@ -40,18 +39,19 @@ import static java.lang.Thread.sleep;
 import static org.testng.Assert.assertEquals;
 
 public class NotificationHookConsumerIT extends BaseResourceIT {
-
     private static final String TEST_USER = "testuser";
-    public static final String NAME = "name";
-    public static final String DESCRIPTION = "description";
+
+    public static final String NAME           = "name";
+    public static final String DESCRIPTION    = "description";
     public static final String QUALIFIED_NAME = "qualifiedName";
-    public static final String CLUSTER_NAME = "clusterName";
+    public static final String CLUSTER_NAME   = "clusterName";
 
-    private NotificationInterface notificationInterface = NotificationProvider.get();
+    private final NotificationInterface notificationInterface = NotificationProvider.get();
 
     @BeforeClass
     public void setUp() throws Exception {
         super.setUp();
+
         createTypeDefinitionsV1();
     }
 
@@ -60,29 +60,33 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
         notificationInterface.close();
     }
 
-    private void sendHookMessage(HookNotificationMessage message) throws NotificationException, InterruptedException {
+    private void sendHookMessage(HookNotification message) throws NotificationException, InterruptedException {
         notificationInterface.send(NotificationInterface.NotificationType.HOOK, message);
+
         sleep(1000);
     }
 
     @Test
     public void testMessageHandleFailureConsumerContinues() throws Exception {
         //send invalid message - update with invalid type
-        sendHookMessage(new HookNotification.EntityPartialUpdateRequest(TEST_USER, randomString(), null, null,
-                new Referenceable(randomString())));
+        sendHookMessage(new EntityPartialUpdateRequest(TEST_USER, randomString(), null, null, new Referenceable(randomString())));
 
         //send valid message
         final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
-        String dbName = "db" + randomString();
+        final String        dbName = "db" + randomString();
+
         entity.set(NAME, dbName);
         entity.set(DESCRIPTION, randomString());
         entity.set(QUALIFIED_NAME, dbName);
         entity.set(CLUSTER_NAME, randomString());
+
         sendHookMessage(new EntityCreateRequest(TEST_USER, entity));
+
         waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 JSONArray results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE_BUILTIN, entity.get(NAME)));
+
                 return results.length() == 1;
             }
         });
@@ -91,24 +95,28 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
     @Test
     public void testCreateEntity() throws Exception {
         final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
-        String dbName = "db" + randomString();
+        final String        dbName = "db" + randomString();
+
         entity.set(NAME, dbName);
         entity.set(DESCRIPTION, randomString());
         entity.set(QUALIFIED_NAME, dbName);
         entity.set(CLUSTER_NAME, randomString());
 
         sendHookMessage(new EntityCreateRequest(TEST_USER, entity));
+
         waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, entity.get(QUALIFIED_NAME)));
+
                 return results.length() == 1;
             }
         });
 
         //Assert that user passed in hook message is used in audit
-        Referenceable instance = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, (String) entity.get(QUALIFIED_NAME));
-        List<EntityAuditEvent> events = atlasClientV1.getEntityAuditEvents(instance.getId()._getId(), (short) 1);
+        Referenceable          instance = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, (String) entity.get(QUALIFIED_NAME));
+        List<EntityAuditEvent> events   = atlasClientV1.getEntityAuditEvents(instance.getId()._getId(), (short) 1);
+
         assertEquals(events.size(), 1);
         assertEquals(events.get(0).getUser(), TEST_USER);
     }
@@ -116,7 +124,8 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
     @Test
     public void testUpdateEntityPartial() throws Exception {
         final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
-        final String dbName = "db" + randomString();
+        final String        dbName = "db" + randomString();
+
         entity.set(NAME, dbName);
         entity.set(DESCRIPTION, randomString());
         entity.set(QUALIFIED_NAME, dbName);
@@ -125,25 +134,31 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
         atlasClientV1.createEntity(entity);
 
         final Referenceable newEntity = new Referenceable(DATABASE_TYPE_BUILTIN);
+
         newEntity.set("owner", randomString());
+
         sendHookMessage(new EntityPartialUpdateRequest(TEST_USER, DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName, newEntity));
+
         waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 Referenceable localEntity = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName);
+
                 return (localEntity.get("owner") != null && localEntity.get("owner").equals(newEntity.get("owner")));
             }
         });
 
         //Its partial update and un-set fields are not updated
         Referenceable actualEntity = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName);
+
         assertEquals(actualEntity.get(DESCRIPTION), entity.get(DESCRIPTION));
     }
 
     @Test
     public void testUpdatePartialUpdatingQualifiedName() throws Exception {
         final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
-        final String dbName = "db" + randomString();
+        final String        dbName = "db" + randomString();
+
         entity.set(NAME, dbName);
         entity.set(DESCRIPTION, randomString());
         entity.set(QUALIFIED_NAME, dbName);
@@ -152,28 +167,32 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
         atlasClientV1.createEntity(entity);
 
         final Referenceable newEntity = new Referenceable(DATABASE_TYPE_BUILTIN);
-        final String newName = "db" + randomString();
+        final String        newName   = "db" + randomString();
+
         newEntity.set(QUALIFIED_NAME, newName);
 
         sendHookMessage(new EntityPartialUpdateRequest(TEST_USER, DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName, newEntity));
+
         waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, newName));
+
                 return results.length() == 1;
             }
         });
 
         //no entity with the old qualified name
         JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, dbName));
-        assertEquals(results.length(), 0);
 
+        assertEquals(results.length(), 0);
     }
 
     @Test
     public void testDeleteByQualifiedName() throws Exception {
-        Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
-        final String dbName = "db" + randomString();
+        final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
+        final String        dbName = "db" + randomString();
+
         entity.set(NAME, dbName);
         entity.set(DESCRIPTION, randomString());
         entity.set(QUALIFIED_NAME, dbName);
@@ -182,10 +201,12 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
         final String dbId = atlasClientV1.createEntity(entity).get(0);
 
         sendHookMessage(new EntityDeleteRequest(TEST_USER, DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName));
+
         waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 Referenceable getEntity = atlasClientV1.getEntity(dbId);
+
                 return getEntity.getId().getState() == Id.EntityState.DELETED;
             }
         });
@@ -193,8 +214,9 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
 
     @Test
     public void testUpdateEntityFullUpdate() throws Exception {
-        Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
-        final String dbName = "db" + randomString();
+        final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
+        final String        dbName = "db" + randomString();
+
         entity.set(NAME, dbName);
         entity.set(DESCRIPTION, randomString());
         entity.set(QUALIFIED_NAME, dbName);
@@ -203,6 +225,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
         atlasClientV1.createEntity(entity);
 
         final Referenceable newEntity = new Referenceable(DATABASE_TYPE_BUILTIN);
+
         newEntity.set(NAME, randomString());
         newEntity.set(DESCRIPTION, randomString());
         newEntity.set("owner", randomString());
@@ -211,18 +234,19 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
 
         //updating unique attribute
         sendHookMessage(new EntityUpdateRequest(TEST_USER, newEntity));
+
         waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, newEntity.get(QUALIFIED_NAME)));
+
                 return results.length() == 1;
             }
         });
 
         Referenceable actualEntity = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName);
+
         assertEquals(actualEntity.get(DESCRIPTION), newEntity.get(DESCRIPTION));
         assertEquals(actualEntity.get("owner"), newEntity.get("owner"));
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 68497e0..4ea13c7 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -25,9 +25,10 @@ import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.atlas.kafka.KafkaNotification;
 import org.apache.atlas.kafka.NotificationProvider;
-import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.notification.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotificationV1;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.v1.EntityStream;
@@ -41,7 +42,7 @@ import org.testng.Assert;
 import org.testng.annotations.AfterTest;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
-import static org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage;
+
 import java.util.List;
 
 import org.apache.atlas.kafka.AtlasKafkaConsumer;
@@ -57,11 +58,11 @@ import static org.testng.Assert.*;
 
 
 public class NotificationHookConsumerKafkaTest {
-
-    public static final String NAME = "name";
-    public static final String DESCRIPTION = "description";
+    public static final String NAME           = "name";
+    public static final String DESCRIPTION    = "description";
     public static final String QUALIFIED_NAME = "qualifiedName";
-    private NotificationInterface notificationInterface = NotificationProvider.get();
+
+    private final NotificationInterface notificationInterface = NotificationProvider.get();
 
 
     @Mock
@@ -81,10 +82,14 @@ public class NotificationHookConsumerKafkaTest {
     @BeforeTest
     public void setup() throws AtlasException, InterruptedException, AtlasBaseException {
         MockitoAnnotations.initMocks(this);
-        AtlasType mockType = mock(AtlasType.class);
+
+        AtlasType                mockType   = mock(AtlasType.class);
+        AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntitiesWithExtInfo.class);
+
         when(typeRegistry.getType(anyString())).thenReturn(mockType);
-        AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class);
+
         when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);
+
         kafkaNotification = startKafkaServer();
     }
 
@@ -97,19 +102,20 @@ public class NotificationHookConsumerKafkaTest {
     @Test
     public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException, AtlasBaseException {
         try {
-            produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
+            produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
 
-            NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, false);
-            NotificationHookConsumer notificationHookConsumer =
-                    new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
-            NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
+            NotificationConsumer<HookNotification> consumer                 = createNewConsumer(kafkaNotification, false);
+            NotificationHookConsumer               notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+            NotificationHookConsumer.HookConsumer  hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
 
             consumeOneMessage(consumer, hookConsumer);
+
             verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
 
             // produce another message, and make sure it moves ahead. If commit succeeded, this would work.
-            produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity()));
+            produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity()));
             consumeOneMessage(consumer, hookConsumer);
+
             verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
             reset(atlasEntityStore);
         }
@@ -121,22 +127,20 @@ public class NotificationHookConsumerKafkaTest {
     @Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled")
     public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception {
         try {
-            produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity()));
+            produceMessage(new HookNotificationV1.EntityCreateRequest("test_user3", createEntity()));
 
-            NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, true);
+            NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, true);
 
             assertNotNull (consumer);
 
-            NotificationHookConsumer notificationHookConsumer =
-                    new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
-            NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
-
+            NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+            NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
 
             consumeOneMessage(consumer, hookConsumer);
             verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
 
             // produce another message, but this will not be consumed, as commit code is not executed in hook consumer.
-            produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity()));
+            produceMessage(new HookNotificationV1.EntityCreateRequest("test_user4", createEntity()));
 
             consumeOneMessage(consumer, hookConsumer);
             verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
@@ -146,18 +150,19 @@ public class NotificationHookConsumerKafkaTest {
         }
     }
 
-    AtlasKafkaConsumer<HookNotificationMessage> createNewConsumer(KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
+    AtlasKafkaConsumer<HookNotification> createNewConsumer(KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
         return (AtlasKafkaConsumer) kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0);
     }
 
-    void consumeOneMessage(NotificationConsumer<HookNotificationMessage> consumer,
+    void consumeOneMessage(NotificationConsumer<HookNotification> consumer,
                            NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException {
         try {
             long startTime = System.currentTimeMillis(); //fetch starting time
+
             while ((System.currentTimeMillis() - startTime) < 10000) {
-                List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive();
+                List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive();
 
-                for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
+                for (AtlasKafkaMessage<HookNotification> msg : messages) {
                     hookConsumer.handleMessage(msg);
                 }
 
@@ -172,19 +177,25 @@ public class NotificationHookConsumerKafkaTest {
 
     Referenceable createEntity() {
         final Referenceable entity = new Referenceable(AtlasClient.DATA_SET_SUPER_TYPE);
+
         entity.set(NAME, "db" + randomString());
         entity.set(DESCRIPTION, randomString());
         entity.set(QUALIFIED_NAME, randomString());
+
         return entity;
     }
 
     KafkaNotification startKafkaServer() throws AtlasException, InterruptedException {
         Configuration applicationProperties = ApplicationProperties.get();
+
         applicationProperties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5));
 
         kafkaNotification = new KafkaNotification(applicationProperties);
+
         kafkaNotification.start();
+
         Thread.sleep(2000);
+
         return kafkaNotification;
     }
 
@@ -192,8 +203,7 @@ public class NotificationHookConsumerKafkaTest {
         return RandomStringUtils.randomAlphanumeric(10);
     }
 
-    private void produceMessage(HookNotificationMessage message) throws NotificationException {
+    private void produceMessage(HookNotification message) throws NotificationException {
         kafkaNotification.send(NotificationInterface.NotificationType.HOOK, message);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index 2d3d5ba..f8bd9a1 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -22,10 +22,12 @@ import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
-import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
+import org.apache.atlas.notification.NotificationInterface.NotificationType;
 import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.notification.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.v1.EntityStream;
@@ -43,6 +45,7 @@ import org.testng.annotations.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
@@ -75,20 +78,24 @@ public class NotificationHookConsumerTest {
     @BeforeMethod
     public void setup() throws AtlasBaseException {
         MockitoAnnotations.initMocks(this);
-        AtlasType mockType = mock(AtlasType.class);
+
+        AtlasType                mockType   = mock(AtlasType.class);
+        AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntitiesWithExtInfo.class);
+
         when(typeRegistry.getType(anyString())).thenReturn(mockType);
-        AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class);
         when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);
+
         EntityMutationResponse mutationResponse = mock(EntityMutationResponse.class);
+
         when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenReturn(mutationResponse);
     }
 
     @Test
     public void testConsumerCanProceedIfServerIsReady() throws Exception {
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
-        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+        NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
+        NotificationHookConsumer.Timer        timer                    = mock(NotificationHookConsumer.Timer.class);
+
         when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
 
         assertTrue(hookConsumer.serverAvailable(timer));
@@ -98,10 +105,9 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception {
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
-        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+        NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
+        NotificationHookConsumer.Timer        timer                    = mock(NotificationHookConsumer.Timer.class);
 
         when(serviceState.getState())
                 .thenReturn(ServiceState.ServiceStateValue.PASSIVE)
@@ -116,35 +122,30 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException {
-        NotificationHookConsumer notificationHookConsumer =
-                new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
-        NotificationConsumer consumer = mock(NotificationConsumer.class);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(consumer);
-        HookNotification.EntityCreateRequest message = mock(HookNotification.EntityCreateRequest.class);
+        NotificationHookConsumer               notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+        NotificationConsumer                   consumer                 = mock(NotificationConsumer.class);
+        NotificationHookConsumer.HookConsumer  hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
+        EntityCreateRequest                    message                  = mock(EntityCreateRequest.class);
+        Referenceable                          mock                     = mock(Referenceable.class);
+
         when(message.getUser()).thenReturn("user");
-        when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE);
-        Referenceable mock = mock(Referenceable.class);
+        when(message.getType()).thenReturn(HookNotificationType.ENTITY_CREATE);
         when(message.getEntities()).thenReturn(Arrays.asList(mock));
 
         hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
+
         verify(consumer).commit(any(TopicPartition.class), anyInt());
     }
 
     @Test
     public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException {
-        NotificationHookConsumer notificationHookConsumer =
-                new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
-        NotificationConsumer consumer = mock(NotificationConsumer.class);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(consumer);
-        HookNotification.EntityCreateRequest message = new HookNotification.EntityCreateRequest("user",
-                new ArrayList<Referenceable>() {
-                    {
-                        add(mock(Referenceable.class));
-                    }
-                });
+        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+        NotificationConsumer                  consumer                 = mock(NotificationConsumer.class);
+        NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
+        EntityCreateRequest                   message                  = new EntityCreateRequest("user", Collections.singletonList(mock(Referenceable.class)));
+
         when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenThrow(new RuntimeException("Simulating exception in processing message"));
+
         hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
 
         verifyZeroInteractions(consumer);
@@ -152,10 +153,10 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerProceedsWithFalseIfInterrupted() throws Exception {
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
-        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+        NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
+        NotificationHookConsumer.Timer        timer                    = mock(NotificationHookConsumer.Timer.class);
+
         doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
         when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
 
@@ -164,58 +165,75 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumersStartedIfHAIsDisabled() throws Exception {
+        List<NotificationConsumer<Object>> consumers = new ArrayList();
+        NotificationConsumer               notificationConsumerMock = mock(NotificationConsumer.class);
+
+        consumers.add(notificationConsumerMock);
+
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        consumers.add(mock(NotificationConsumer.class));
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
-                thenReturn(consumers);
+        when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
+
         NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+
         notificationHookConsumer.startInternal(configuration, executorService);
-        verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
+
+        verify(notificationInterface).createConsumers(NotificationType.HOOK, 1);
         verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
     }
 
     @Test
     public void testConsumersAreNotStartedIfHAIsEnabled() throws Exception {
+        List<NotificationConsumer<Object>> consumers = new ArrayList();
+        NotificationConsumer               notificationConsumerMock = mock(NotificationConsumer.class);
+
+        consumers.add(notificationConsumerMock);
+
         when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        consumers.add(mock(NotificationConsumer.class));
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
-                thenReturn(consumers);
+        when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
+
         NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+
         notificationHookConsumer.startInternal(configuration, executorService);
+
         verifyZeroInteractions(notificationInterface);
     }
 
     @Test
     public void testConsumersAreStartedWhenInstanceBecomesActive() throws Exception {
+        List<NotificationConsumer<Object>> consumers = new ArrayList();
+        NotificationConsumer               notificationConsumerMock = mock(NotificationConsumer.class);
+
+        consumers.add(notificationConsumerMock);
+
         when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        consumers.add(mock(NotificationConsumer.class));
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
-                thenReturn(consumers);
+        when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
+
         NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+
         notificationHookConsumer.startInternal(configuration, executorService);
         notificationHookConsumer.instanceIsActive();
-        verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
+
+        verify(notificationInterface).createConsumers(NotificationType.HOOK, 1);
         verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
     }
 
     @Test
     public void testConsumersAreStoppedWhenInstanceBecomesPassive() throws Exception {
+        List<NotificationConsumer<Object>> consumers = new ArrayList();
+        NotificationConsumer               notificationConsumerMock = mock(NotificationConsumer.class);
+
+        consumers.add(notificationConsumerMock);
+
         when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
-        consumers.add(notificationConsumerMock);
+        when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
 
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers);
         final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
 
         doAnswer(new Answer() {
@@ -223,12 +241,14 @@ public class NotificationHookConsumerTest {
             public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                 notificationHookConsumer.consumers.get(0).start();
                 Thread.sleep(500);
+
                 return null;
             }
         }).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
 
         notificationHookConsumer.startInternal(configuration, executorService);
         notificationHookConsumer.instanceIsPassive();
+
         verify(notificationInterface).close();
         verify(executorService).shutdown();
         verify(notificationConsumerMock).wakeup();
@@ -236,18 +256,21 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void consumersStoppedBeforeStarting() throws Exception {
+        List<NotificationConsumer<Object>> consumers                = new ArrayList();
+        NotificationConsumer               notificationConsumerMock = mock(NotificationConsumer.class);
+
+        consumers.add(notificationConsumerMock);
+
         when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
-        consumers.add(notificationConsumerMock);
+        when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
 
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers);
         final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
 
         notificationHookConsumer.startInternal(configuration, executorService);
         notificationHookConsumer.instanceIsPassive();
+
         verify(notificationInterface).close();
         verify(executorService).shutdown();
     }
@@ -261,13 +284,16 @@ public class NotificationHookConsumerTest {
             public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                 notificationHookConsumer.consumers.get(0).start();
                 Thread.sleep(1000);
+
                 return null;
             }
         }).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
 
         notificationHookConsumer.startInternal(configuration, executorService);
         Thread.sleep(1000);
+
         assertTrue(notificationHookConsumer.consumers.get(0).isAlive());
+
         notificationHookConsumer.consumers.get(0).shutdown();
     }
 
@@ -280,27 +306,32 @@ public class NotificationHookConsumerTest {
             public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                 notificationHookConsumer.consumers.get(0).start();
                 Thread.sleep(500);
+
                 return null;
             }
         }).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
 
         notificationHookConsumer.startInternal(configuration, executorService);
         Thread.sleep(500);
+
         notificationHookConsumer.consumers.get(0).shutdown();
         Thread.sleep(500);
+
         assertFalse(notificationHookConsumer.consumers.get(0).isAlive());
     }
 
     private NotificationHookConsumer setupNotificationHookConsumer() throws AtlasException {
+        List<NotificationConsumer<Object>> consumers                = new ArrayList();
+        NotificationConsumer               notificationConsumerMock = mock(NotificationConsumer.class);
+
+        consumers.add(notificationConsumerMock);
+
         when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
         when(notificationConsumerMock.receive()).thenThrow(new IllegalStateException());
-        consumers.add(notificationConsumerMock);
+        when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
 
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers);
         return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
     }
 }