You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2016/12/23 01:46:46 UTC

[17/50] [abbrv] incubator-atlas git commit: ATLAS-1111 Data loss is observed when atlas is restarted while hive_table metadata ingestion into kafka topic is in-progress(shwethags via sumasai)

ATLAS-1111 Data loss is observed when atlas is restarted while hive_table metadata ingestion into kafka topic is in-progress(shwethags via sumasai)

(cherry picked from commit 30893c5e5be5a1c3a2f104d554cc3772a6ef7b81)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/9ea1ad6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/9ea1ad6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/9ea1ad6d

Branch: refs/heads/0.7-incubating
Commit: 9ea1ad6d084b593d5ed88fd4b191c03907f49de1
Parents: f1e8906
Author: Suma Shivaprasad <su...@gmail.com>
Authored: Fri Aug 12 14:03:55 2016 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Thu Dec 22 15:17:17 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/atlas/service/Services.java |   4 +-
 distro/src/conf/atlas-log4j.xml                 |  13 ++
 .../org/apache/atlas/kafka/KafkaConsumer.java   |   5 +
 .../notification/NotificationConsumer.java      |   2 +
 .../notification/hook/HookNotification.java     |   2 +-
 .../atlas/kafka/KafkaNotificationMockTest.java  | 198 ++++++++++++++++
 .../atlas/kafka/KafkaNotificationTest.java      | 233 ++++++-------------
 .../AbstractNotificationConsumerTest.java       |   5 +
 .../classloader/AtlasPluginClassLoader.java     |  32 +--
 release-log.txt                                 |   1 +
 .../atlas/GraphTransactionInterceptor.java      |   2 +-
 typesystem/src/main/resources/atlas-log4j.xml   |  13 ++
 .../test/resources/atlas-application.properties |   4 +-
 .../notification/NotificationHookConsumer.java  | 113 ++++++---
 .../atlas/web/listeners/GuiceServletConfig.java |  13 +-
 webapp/src/main/webapp/WEB-INF/web.xml          |   8 +-
 .../NotificationHookConsumerKafkaTest.java      |  19 +-
 .../NotificationHookConsumerTest.java           |  31 ++-
 18 files changed, 444 insertions(+), 254 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/common/src/main/java/org/apache/atlas/service/Services.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/service/Services.java b/common/src/main/java/org/apache/atlas/service/Services.java
index 8b2e205..588dd8e 100644
--- a/common/src/main/java/org/apache/atlas/service/Services.java
+++ b/common/src/main/java/org/apache/atlas/service/Services.java
@@ -41,7 +41,7 @@ public class Services {
     public void start() {
         try {
             for (Service service : services) {
-                LOG.debug("Starting service {}", service.getClass().getName());
+                LOG.info("Starting service {}", service.getClass().getName());
                 service.start();
             }
         } catch (Exception e) {
@@ -51,7 +51,7 @@ public class Services {
 
     public void stop() {
         for (Service service : services) {
-            LOG.debug("Stopping service {}", service.getClass().getName());
+            LOG.info("Stopping service {}", service.getClass().getName());
             try {
                 service.stop();
             } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/distro/src/conf/atlas-log4j.xml
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-log4j.xml b/distro/src/conf/atlas-log4j.xml
index eaa4ec5..400cd3a 100755
--- a/distro/src/conf/atlas-log4j.xml
+++ b/distro/src/conf/atlas-log4j.xml
@@ -43,6 +43,14 @@
         </layout>
     </appender>
 
+    <appender name="FAILED" class="org.apache.log4j.DailyRollingFileAppender">
+        <param name="File" value="${atlas.log.dir}/failed.log"/>
+        <param name="Append" value="true"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %m"/>
+        </layout>
+    </appender>
+
     <logger name="org.apache.atlas" additivity="false">
         <level value="info"/>
         <appender-ref ref="FILE"/>
@@ -80,6 +88,11 @@
         <appender-ref ref="AUDIT"/>
     </logger>
 
+    <logger name="FAILED" additivity="false">
+        <level value="info"/>
+        <appender-ref ref="AUDIT"/>
+    </logger>
+
     <root>
         <priority value="warn"/>
         <appender-ref ref="FILE"/>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
index 270215d..16c0eb2 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
@@ -96,4 +96,9 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> {
             LOG.debug("Committed offset: {}", lastSeenOffset);
         }
     }
+
+    @Override
+    public void close() {
+        consumerConnector.shutdown();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index 2e861cb..a99cb10 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -52,4 +52,6 @@ public interface NotificationConsumer<T> {
      * restart.
      */
     void commit();
+
+    void close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
index 88a0322..a25aa52 100644
--- a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
@@ -156,7 +156,7 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
             }
         }
 
-        public List<Referenceable> getEntities() throws JSONException {
+        public List<Referenceable> getEntities() {
             return entities;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
new file mode 100644
index 0000000..2126be6
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.kafka;
+
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.serializer.StringDecoder;
+import org.apache.atlas.notification.MessageDeserializer;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationException;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class KafkaNotificationMockTest {
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testCreateConsumers() throws Exception {
+        Properties properties = mock(Properties.class);
+        when(properties.getProperty("entities.group.id")).thenReturn("atlas");
+        final ConsumerConnector consumerConnector = mock(ConsumerConnector.class);
+        Map<String, Integer> topicCountMap = new HashMap<>();
+        topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 1);
+
+        Map<String, List<KafkaStream<String, String>>> kafkaStreamsMap =
+                new HashMap<>();
+        List<KafkaStream<String, String>> kafkaStreams = new ArrayList<>();
+        KafkaStream kafkaStream = mock(KafkaStream.class);
+        kafkaStreams.add(kafkaStream);
+        kafkaStreamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, kafkaStreams);
+
+        when(consumerConnector.createMessageStreams(
+                eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class))).thenReturn(kafkaStreamsMap);
+
+        final KafkaConsumer consumer1 = mock(KafkaConsumer.class);
+        final KafkaConsumer consumer2 = mock(KafkaConsumer.class);
+
+        KafkaNotification kafkaNotification =
+                new TestKafkaNotification(properties, consumerConnector, consumer1, consumer2);
+
+        List<NotificationConsumer<String>> consumers =
+                kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 2);
+
+        verify(consumerConnector, times(2)).createMessageStreams(
+                eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class));
+        assertEquals(consumers.size(), 2);
+        assertTrue(consumers.contains(consumer1));
+        assertTrue(consumers.contains(consumer2));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldSendMessagesSuccessfully() throws NotificationException,
+            ExecutionException, InterruptedException {
+        Properties configProperties = mock(Properties.class);
+        KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
+
+        Producer producer = mock(Producer.class);
+        String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
+        String message = "This is a test message";
+        Future returnValue = mock(Future.class);
+        when(returnValue.get()).thenReturn(new RecordMetadata(new TopicPartition(topicName, 0), 0, 0));
+        ProducerRecord expectedRecord = new ProducerRecord(topicName, message);
+        when(producer.send(expectedRecord)).thenReturn(returnValue);
+
+        kafkaNotification.sendInternalToProducer(producer,
+                NotificationInterface.NotificationType.HOOK, new String[]{message});
+
+        verify(producer).send(expectedRecord);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldThrowExceptionIfProducerFails() throws NotificationException,
+            ExecutionException, InterruptedException {
+        Properties configProperties = mock(Properties.class);
+        KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
+
+        Producer producer = mock(Producer.class);
+        String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
+        String message = "This is a test message";
+        Future returnValue = mock(Future.class);
+        when(returnValue.get()).thenThrow(new RuntimeException("Simulating exception"));
+        ProducerRecord expectedRecord = new ProducerRecord(topicName, message);
+        when(producer.send(expectedRecord)).thenReturn(returnValue);
+
+        try {
+            kafkaNotification.sendInternalToProducer(producer,
+                NotificationInterface.NotificationType.HOOK, new String[]{message});
+            fail("Should have thrown NotificationException");
+        } catch (NotificationException e) {
+            assertEquals(e.getFailedMessages().size(), 1);
+            assertEquals(e.getFailedMessages().get(0), "This is a test message");
+        }
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldCollectAllFailedMessagesIfProducerFails() throws NotificationException,
+            ExecutionException, InterruptedException {
+        Properties configProperties = mock(Properties.class);
+        KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
+
+        Producer producer = mock(Producer.class);
+        String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
+        String message1 = "This is a test message1";
+        String message2 = "This is a test message2";
+        Future returnValue1 = mock(Future.class);
+        when(returnValue1.get()).thenThrow(new RuntimeException("Simulating exception"));
+        Future returnValue2 = mock(Future.class);
+        when(returnValue2.get()).thenThrow(new RuntimeException("Simulating exception"));
+        ProducerRecord expectedRecord1 = new ProducerRecord(topicName, message1);
+        when(producer.send(expectedRecord1)).thenReturn(returnValue1);
+        ProducerRecord expectedRecord2 = new ProducerRecord(topicName, message2);
+        when(producer.send(expectedRecord2)).thenReturn(returnValue1);
+
+        try {
+            kafkaNotification.sendInternalToProducer(producer,
+                    NotificationInterface.NotificationType.HOOK, new String[]{message1, message2});
+            fail("Should have thrown NotificationException");
+        } catch (NotificationException e) {
+            assertEquals(e.getFailedMessages().size(), 2);
+            assertEquals(e.getFailedMessages().get(0), "This is a test message1");
+            assertEquals(e.getFailedMessages().get(1), "This is a test message2");
+        }
+    }
+
+    class TestKafkaNotification extends KafkaNotification {
+
+        private final ConsumerConnector consumerConnector;
+        private final KafkaConsumer consumer1;
+        private final KafkaConsumer consumer2;
+
+        TestKafkaNotification(Properties properties, ConsumerConnector consumerConnector,
+                              KafkaConsumer consumer1, KafkaConsumer consumer2) {
+            super(properties);
+            this.consumerConnector = consumerConnector;
+            this.consumer1 = consumer1;
+            this.consumer2 = consumer2;
+        }
+
+        @Override
+        protected ConsumerConnector createConsumerConnector(Properties consumerProperties) {
+            return consumerConnector;
+        }
+
+        @Override
+        protected <T> org.apache.atlas.kafka.KafkaConsumer<T>
+        createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer, KafkaStream stream,
+                            int consumerId, ConsumerConnector connector, boolean autoCommitEnabled) {
+            if (consumerId == 0) {
+                return consumer1;
+            } else if (consumerId == 1) {
+                return consumer2;
+            }
+            return null;
+        }
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index 2a49634..a810029 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -15,184 +15,93 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.atlas.kafka;
 
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.serializer.StringDecoder;
-import org.apache.atlas.notification.MessageDeserializer;
+import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.notification.NotificationConsumer;
-import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.RandomStringUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
 
 public class KafkaNotificationTest {
 
-    @Test
-    @SuppressWarnings("unchecked")
-    public void testCreateConsumers() throws Exception {
-        Properties properties = mock(Properties.class);
-        when(properties.getProperty("entities.group.id")).thenReturn("atlas");
-        final ConsumerConnector consumerConnector = mock(ConsumerConnector.class);
-        Map<String, Integer> topicCountMap = new HashMap<>();
-        topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 1);
-
-        Map<String, List<KafkaStream<String, String>>> kafkaStreamsMap =
-                new HashMap<>();
-        List<KafkaStream<String, String>> kafkaStreams = new ArrayList<>();
-        KafkaStream kafkaStream = mock(KafkaStream.class);
-        kafkaStreams.add(kafkaStream);
-        kafkaStreamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, kafkaStreams);
-
-        when(consumerConnector.createMessageStreams(
-                eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class))).thenReturn(kafkaStreamsMap);
-
-        final KafkaConsumer consumer1 = mock(KafkaConsumer.class);
-        final KafkaConsumer consumer2 = mock(KafkaConsumer.class);
-
-        KafkaNotification kafkaNotification =
-                new TestKafkaNotification(properties, consumerConnector, consumer1, consumer2);
-
-        List<NotificationConsumer<String>> consumers =
-                kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 2);
-
-        verify(consumerConnector, times(2)).createMessageStreams(
-                eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class));
-        assertEquals(consumers.size(), 2);
-        assertTrue(consumers.contains(consumer1));
-        assertTrue(consumers.contains(consumer2));
-    }
+    private KafkaNotification kafkaNotification;
 
-    @Test
-    @SuppressWarnings("unchecked")
-    public void shouldSendMessagesSuccessfully() throws NotificationException,
-            ExecutionException, InterruptedException {
-        Properties configProperties = mock(Properties.class);
-        KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
-
-        Producer producer = mock(Producer.class);
-        String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
-        String message = "This is a test message";
-        Future returnValue = mock(Future.class);
-        when(returnValue.get()).thenReturn(new RecordMetadata(new TopicPartition(topicName, 0), 0, 0));
-        ProducerRecord expectedRecord = new ProducerRecord(topicName, message);
-        when(producer.send(expectedRecord)).thenReturn(returnValue);
-
-        kafkaNotification.sendInternalToProducer(producer,
-                NotificationInterface.NotificationType.HOOK, new String[]{message});
-
-        verify(producer).send(expectedRecord);
-    }
+    @BeforeClass
+    public void setup() throws Exception {
+        Configuration properties = ApplicationProperties.get();
+        properties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5));
 
-    @Test
-    @SuppressWarnings("unchecked")
-    public void shouldThrowExceptionIfProducerFails() throws NotificationException,
-            ExecutionException, InterruptedException {
-        Properties configProperties = mock(Properties.class);
-        KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
-
-        Producer producer = mock(Producer.class);
-        String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
-        String message = "This is a test message";
-        Future returnValue = mock(Future.class);
-        when(returnValue.get()).thenThrow(new RuntimeException("Simulating exception"));
-        ProducerRecord expectedRecord = new ProducerRecord(topicName, message);
-        when(producer.send(expectedRecord)).thenReturn(returnValue);
-
-        try {
-            kafkaNotification.sendInternalToProducer(producer,
-                NotificationInterface.NotificationType.HOOK, new String[]{message});
-            fail("Should have thrown NotificationException");
-        } catch (NotificationException e) {
-            assertEquals(e.getFailedMessages().size(), 1);
-            assertEquals(e.getFailedMessages().get(0), "This is a test message");
-        }
+        kafkaNotification = new KafkaNotification(properties);
+        kafkaNotification.start();
     }
 
-    @Test
-    @SuppressWarnings("unchecked")
-    public void shouldCollectAllFailedMessagesIfProducerFails() throws NotificationException,
-            ExecutionException, InterruptedException {
-        Properties configProperties = mock(Properties.class);
-        KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
-
-        Producer producer = mock(Producer.class);
-        String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
-        String message1 = "This is a test message1";
-        String message2 = "This is a test message2";
-        Future returnValue1 = mock(Future.class);
-        when(returnValue1.get()).thenThrow(new RuntimeException("Simulating exception"));
-        Future returnValue2 = mock(Future.class);
-        when(returnValue2.get()).thenThrow(new RuntimeException("Simulating exception"));
-        ProducerRecord expectedRecord1 = new ProducerRecord(topicName, message1);
-        when(producer.send(expectedRecord1)).thenReturn(returnValue1);
-        ProducerRecord expectedRecord2 = new ProducerRecord(topicName, message2);
-        when(producer.send(expectedRecord2)).thenReturn(returnValue1);
-
-        try {
-            kafkaNotification.sendInternalToProducer(producer,
-                    NotificationInterface.NotificationType.HOOK, new String[]{message1, message2});
-            fail("Should have thrown NotificationException");
-        } catch (NotificationException e) {
-            assertEquals(e.getFailedMessages().size(), 2);
-            assertEquals(e.getFailedMessages().get(0), "This is a test message1");
-            assertEquals(e.getFailedMessages().get(1), "This is a test message2");
-        }
+    @AfterClass
+    public void shutdown() throws Exception {
+        kafkaNotification.close();
+        kafkaNotification.stop();
     }
 
-    class TestKafkaNotification extends KafkaNotification {
-
-        private final ConsumerConnector consumerConnector;
-        private final KafkaConsumer consumer1;
-        private final KafkaConsumer consumer2;
-
-        TestKafkaNotification(Properties properties, ConsumerConnector consumerConnector,
-                              KafkaConsumer consumer1, KafkaConsumer consumer2) {
-            super(properties);
-            this.consumerConnector = consumerConnector;
-            this.consumer1 = consumer1;
-            this.consumer2 = consumer2;
-        }
-
-        @Override
-        protected ConsumerConnector createConsumerConnector(Properties consumerProperties) {
-            return consumerConnector;
-        }
-
-        @Override
-        protected <T> org.apache.atlas.kafka.KafkaConsumer<T>
-        createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer, KafkaStream stream,
-                            int consumerId, ConsumerConnector connector, boolean autoCommitEnabled) {
-            if (consumerId == 0) {
-                return consumer1;
-            } else if (consumerId == 1) {
-                return consumer2;
-            }
-            return null;
-        }
-
-
+    @Test
+    public void testNext() throws Exception {
+        kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
+                new HookNotification.EntityCreateRequest("u1", new Referenceable("type")));
+        kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
+                new HookNotification.EntityCreateRequest("u2", new Referenceable("type")));
+        kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
+                new HookNotification.EntityCreateRequest("u3", new Referenceable("type")));
+        kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
+                new HookNotification.EntityCreateRequest("u4", new Referenceable("type")));
+
+        NotificationConsumer<Object> consumer =
+                kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
+        assertTrue(consumer.hasNext());
+        HookNotification.HookNotificationMessage message = (HookNotification.HookNotificationMessage) consumer.next();
+        assertEquals(message.getUser(), "u1");
+
+        assertTrue(consumer.hasNext());
+        message = (HookNotification.HookNotificationMessage) consumer.next();
+        assertEquals(message.getUser(), "u2");
+        consumer.close();
+
+        //nothing committed(even though u1 and u2 are read), now should restart from u1
+        consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
+        assertTrue(consumer.hasNext());
+        message = (HookNotification.HookNotificationMessage) consumer.next();
+        assertEquals(message.getUser(), "u1");
+        consumer.commit();
+
+        assertTrue(consumer.hasNext());
+        message = (HookNotification.HookNotificationMessage) consumer.next();
+        assertEquals(message.getUser(), "u2");
+        consumer.close();
+
+        //u1 committed, u2 read, should start from u2
+        consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
+        assertTrue(consumer.hasNext());
+        message = (HookNotification.HookNotificationMessage) consumer.next();
+        assertEquals(message.getUser(), "u2");
+
+        assertTrue(consumer.hasNext());
+        message = (HookNotification.HookNotificationMessage) consumer.next();
+        assertEquals(message.getUser(), "u3");
+        consumer.commit();
+        consumer.close();
+
+        //u2, u3 read, but only u3 committed, should start from u4
+        consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
+        assertTrue(consumer.hasNext());
+        message = (HookNotification.HookNotificationMessage) consumer.next();
+        assertEquals(message.getUser(), "u4");
+        consumer.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index 0c8990f..ed5b9fc 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -262,6 +262,11 @@ public class AbstractNotificationConsumerTest {
         public void commit() {
             // do nothing.
         }
+
+        @Override
+        public void close() {
+            //do nothing
+        }
     }
 
     private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/plugin-classloader/src/main/java/org/apache/atlas/plugin/classloader/AtlasPluginClassLoader.java
----------------------------------------------------------------------
diff --git a/plugin-classloader/src/main/java/org/apache/atlas/plugin/classloader/AtlasPluginClassLoader.java b/plugin-classloader/src/main/java/org/apache/atlas/plugin/classloader/AtlasPluginClassLoader.java
index 92cc406..0dbf352 100644
--- a/plugin-classloader/src/main/java/org/apache/atlas/plugin/classloader/AtlasPluginClassLoader.java
+++ b/plugin-classloader/src/main/java/org/apache/atlas/plugin/classloader/AtlasPluginClassLoader.java
@@ -75,16 +75,16 @@ public final class AtlasPluginClassLoader extends URLClassLoader {
 
     @Override
     public Class<?> findClass(String name) throws ClassNotFoundException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> AtlasPluginClassLoader.findClass(" + name + ")");
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("==> AtlasPluginClassLoader.findClass(" + name + ")");
         }
 
         Class<?> ret = null;
 
         try {
             // first try to find the class in pluginClassloader
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("AtlasPluginClassLoader.findClass(" + name + "): calling pluginClassLoader.findClass()");
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("AtlasPluginClassLoader.findClass(" + name + "): calling pluginClassLoader.findClass()");
             }
 
             ret = super.findClass(name);
@@ -93,8 +93,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader {
             MyClassLoader savedClassLoader = getComponentClassLoader();
 
             if (savedClassLoader != null) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace(
                             "AtlasPluginClassLoader.findClass(" + name + "): calling componentClassLoader.findClass()");
                 }
 
@@ -102,8 +102,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader {
             }
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== AtlasPluginClassLoader.findClass(" + name + "): " + ret);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("<== AtlasPluginClassLoader.findClass(" + name + "): " + ret);
         }
 
         return ret;
@@ -111,16 +111,16 @@ public final class AtlasPluginClassLoader extends URLClassLoader {
 
     @Override
     public Class<?> loadClass(String name) throws ClassNotFoundException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> AtlasPluginClassLoader.loadClass(" + name + ")");
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("==> AtlasPluginClassLoader.loadClass(" + name + ")");
         }
 
         Class<?> ret = null;
 
         try {
             // first try to load the class from pluginClassloader
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("AtlasPluginClassLoader.loadClass(" + name + "): calling pluginClassLoader.loadClass()");
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("AtlasPluginClassLoader.loadClass(" + name + "): calling pluginClassLoader.loadClass()");
             }
 
             ret = super.loadClass(name);
@@ -129,8 +129,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader {
             MyClassLoader savedClassLoader = getComponentClassLoader();
 
             if (savedClassLoader != null) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace(
                             "AtlasPluginClassLoader.loadClass(" + name + "): calling componentClassLoader.loadClass()");
                 }
 
@@ -138,8 +138,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader {
             }
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== AtlasPluginClassLoader.loadClass(" + name + "): " + ret);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("<== AtlasPluginClassLoader.loadClass(" + name + "): " + ret);
         }
 
         return ret;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index a2fb2c0..c55dffd 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -28,6 +28,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
 
 ALL CHANGES:
+ATLAS-1111 Data loss is observed when atlas is restarted while hive_table metadata ingestion into kafka topic is in-progress(shwethags via sumasai)
 ATLAS-1108: updated references to atlas.rest.address to handle multiple URLs
 ATLAS-1112 Hive table GET response from atlas server had duplicate column entries ( ayubkhan, mneethiraj via sumasai)
 ATLAS-1108 In Atlas HA mode , import-hive.sh in Passive instance fails. (ayubkhan via sumasai)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
index b9689f4..20e8ebc 100644
--- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
+++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
@@ -41,7 +41,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
         try {
             Object response = invocation.proceed();
             titanGraph.commit();
-            LOG.debug("graph commit");
+            LOG.info("graph commit");
             return response;
         } catch (Throwable t) {
             titanGraph.rollback();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/typesystem/src/main/resources/atlas-log4j.xml
----------------------------------------------------------------------
diff --git a/typesystem/src/main/resources/atlas-log4j.xml b/typesystem/src/main/resources/atlas-log4j.xml
index 0f7573e..5a48854 100755
--- a/typesystem/src/main/resources/atlas-log4j.xml
+++ b/typesystem/src/main/resources/atlas-log4j.xml
@@ -57,6 +57,19 @@
     </logger>
     -->
 
+    <appender name="FAILED" class="org.apache.log4j.DailyRollingFileAppender">
+        <param name="File" value="${atlas.log.dir}/failed.log"/>
+        <param name="Append" value="true"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %m"/>
+        </layout>
+    </appender>
+
+    <logger name="FAILED" additivity="false">
+        <level value="info"/>
+        <appender-ref ref="AUDIT"/>
+    </logger>
+
     <logger name="com.thinkaurelius.titan" additivity="false">
         <level value="info"/>
         <appender-ref ref="console"/>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/typesystem/src/test/resources/atlas-application.properties
----------------------------------------------------------------------
diff --git a/typesystem/src/test/resources/atlas-application.properties b/typesystem/src/test/resources/atlas-application.properties
index a3b6c90..fb31462 100644
--- a/typesystem/src/test/resources/atlas-application.properties
+++ b/typesystem/src/test/resources/atlas-application.properties
@@ -77,7 +77,7 @@ atlas.kafka.bootstrap.servers=localhost:19027
 atlas.kafka.data=${sys:atlas.data}/kafka
 atlas.kafka.zookeeper.session.timeout.ms=4000
 atlas.kafka.zookeeper.sync.time.ms=20
-atlas.kafka.consumer.timeout.ms=100
+atlas.kafka.consumer.timeout.ms=4000
 atlas.kafka.auto.commit.interval.ms=100
 atlas.kafka.hook.group.id=atlas
 atlas.kafka.entities.group.id=atlas_entities
@@ -122,4 +122,4 @@ atlas.auth.policy.file=${sys:user.dir}/distro/src/conf/policy-store.txt
 atlas.authentication.method.file=true
 atlas.authentication.method.ldap.type=none
 atlas.authentication.method.file.filename=${sys:user.dir}/distro/src/conf/users-credentials.properties
-atlas.authentication.method.kerberos=false
\ No newline at end of file
+atlas.authentication.method.kerberos=false

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 901b1ed..6b1f3f2 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -24,6 +24,7 @@ import com.google.inject.Singleton;
 import kafka.consumer.ConsumerTimeoutException;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.LocalAtlasClient;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
@@ -46,11 +47,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
 @Singleton
 public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
     private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
+    private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
+
     private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
 
     public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
+    public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
+    public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize";
+
     public static final int SERVER_READY_WAIT_TIME_MS = 1000;
     private final LocalAtlasClient atlasClient;
+    private final int maxRetries;
+    private final int failedMsgCacheSize;
 
     private NotificationInterface notificationInterface;
     private ExecutorService executors;
@@ -58,20 +66,23 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     private List<HookConsumer> consumers;
 
     @Inject
-    public NotificationHookConsumer(NotificationInterface notificationInterface, LocalAtlasClient atlasClient) {
+    public NotificationHookConsumer(NotificationInterface notificationInterface, LocalAtlasClient atlasClient)
+            throws AtlasException {
         this.notificationInterface = notificationInterface;
         this.atlasClient = atlasClient;
+        this.applicationProperties = ApplicationProperties.get();
+
+        maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
+        failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20);
+
     }
 
     @Override
     public void start() throws AtlasException {
-        Configuration configuration = ApplicationProperties.get();
-        startInternal(configuration, null);
+        startInternal(applicationProperties, null);
     }
 
-    void startInternal(Configuration configuration,
-                       ExecutorService executorService) {
-        this.applicationProperties = configuration;
+    void startInternal(Configuration configuration, ExecutorService executorService) {
         if (consumers == null) {
             consumers = new ArrayList<>();
         }
@@ -103,16 +114,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     @Override
     public void stop() {
         //Allow for completion of outstanding work
-        notificationInterface.close();
         try {
+            stopConsumerThreads();
             if (executors != null) {
-                stopConsumerThreads();
-                executors.shutdownNow();
+                executors.shutdown();
                 if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                     LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
                 }
                 executors = null;
             }
+            notificationInterface.close();
         } catch (InterruptedException e) {
             LOG.error("Failure in shutting down consumers");
         }
@@ -160,6 +171,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     class HookConsumer implements Runnable {
         private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
         private final AtomicBoolean shouldRun = new AtomicBoolean(false);
+        private List<HookNotification.HookNotificationMessage> failedMessages = new ArrayList<>();
 
         public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
             this.consumer = consumer;
@@ -193,45 +205,71 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         }
 
         @VisibleForTesting
-        void handleMessage(HookNotification.HookNotificationMessage message) {
-            atlasClient.setUser(message.getUser());
-            try {
-                switch (message.getType()) {
-                case ENTITY_CREATE:
-                    HookNotification.EntityCreateRequest createRequest =
+        void handleMessage(HookNotification.HookNotificationMessage message) throws
+            AtlasServiceException, AtlasException {
+            for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
+                LOG.debug("Running attempt {}", numRetries);
+                try {
+                    atlasClient.setUser(message.getUser());
+                    switch (message.getType()) {
+                    case ENTITY_CREATE:
+                        HookNotification.EntityCreateRequest createRequest =
                             (HookNotification.EntityCreateRequest) message;
-                    atlasClient.createEntity(createRequest.getEntities());
-                    break;
+                        atlasClient.createEntity(createRequest.getEntities());
+                        break;
 
-                case ENTITY_PARTIAL_UPDATE:
-                    HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
+                    case ENTITY_PARTIAL_UPDATE:
+                        HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
                             (HookNotification.EntityPartialUpdateRequest) message;
-                    atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
+                        atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
                             partialUpdateRequest.getAttribute(),
                             partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity());
-                    break;
+                        break;
 
-                case ENTITY_DELETE:
-                    HookNotification.EntityDeleteRequest deleteRequest =
-                        (HookNotification.EntityDeleteRequest) message;
-                    atlasClient.deleteEntity(deleteRequest.getTypeName(),
-                        deleteRequest.getAttribute(),
-                        deleteRequest.getAttributeValue());
-                    break;
+                    case ENTITY_DELETE:
+                        HookNotification.EntityDeleteRequest deleteRequest =
+                            (HookNotification.EntityDeleteRequest) message;
+                        atlasClient.deleteEntity(deleteRequest.getTypeName(),
+                            deleteRequest.getAttribute(),
+                            deleteRequest.getAttributeValue());
+                        break;
 
-                case ENTITY_FULL_UPDATE:
-                    HookNotification.EntityUpdateRequest updateRequest =
+                    case ENTITY_FULL_UPDATE:
+                        HookNotification.EntityUpdateRequest updateRequest =
                             (HookNotification.EntityUpdateRequest) message;
-                    atlasClient.updateEntities(updateRequest.getEntities());
-                    break;
+                        atlasClient.updateEntities(updateRequest.getEntities());
+                        break;
 
-                default:
-                    throw new IllegalStateException("Unhandled exception!");
+                    default:
+                        throw new IllegalStateException("Unhandled exception!");
+                    }
+
+                    break;
+                } catch (Throwable e) {
+                    LOG.warn("Error handling message", e);
+                    if (numRetries == (maxRetries - 1)) {
+                        LOG.warn("Max retries exceeded for message {}", message, e);
+                        failedMessages.add(message);
+                        if (failedMessages.size() >= failedMsgCacheSize) {
+                            recordFailedMessages();
+                        }
+                        return;
+                    }
                 }
-            } catch (Exception e) {
-                //todo handle failures
-                LOG.warn("Error handling message {}", message, e);
             }
+            commit();
+        }
+
+        private void recordFailedMessages() {
+            //logging failed messages
+            for (HookNotification.HookNotificationMessage message : failedMessages) {
+                FAILED_LOG.error("[DROPPED_NOTIFICATION] " + AbstractNotification.getMessageJson(message));
+            }
+            failedMessages.clear();
+        }
+
+        private void commit() {
+            recordFailedMessages();
             consumer.commit();
         }
 
@@ -260,6 +298,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
         public void stop() {
             shouldRun.set(false);
+            consumer.close();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
index 0a7c5df..a1d3187 100755
--- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
+++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
@@ -155,8 +155,11 @@ public class GuiceServletConfig extends GuiceServletContextListener {
 
     @Override
     public void contextDestroyed(ServletContextEvent servletContextEvent) {
-        super.contextDestroyed(servletContextEvent);
+        LOG.info("Starting servlet context destroy");
         if(injector != null) {
+            //stop services
+            stopServices();
+
             TypeLiteral<GraphProvider<TitanGraph>> graphProviderType = new TypeLiteral<GraphProvider<TitanGraph>>() {};
             Provider<GraphProvider<TitanGraph>> graphProvider = injector.getProvider(Key.get(graphProviderType));
             final Graph graph = graphProvider.get().get();
@@ -166,15 +169,13 @@ public class GuiceServletConfig extends GuiceServletContextListener {
             } catch(Throwable t) {
                 LOG.warn("Error while shutting down graph", t);
             }
-
-            //stop services
-            stopServices();
         }
+        super.contextDestroyed(servletContextEvent);
     }
 
     protected void stopServices() {
-        LOG.debug("Stopping services");
+        LOG.info("Stopping services");
         Services services = injector.getInstance(Services.class);
         services.stop();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/webapp/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/webapp/src/main/webapp/WEB-INF/web.xml b/webapp/src/main/webapp/WEB-INF/web.xml
index 34b6856..2e36b94 100755
--- a/webapp/src/main/webapp/WEB-INF/web.xml
+++ b/webapp/src/main/webapp/WEB-INF/web.xml
@@ -63,11 +63,11 @@
     </filter-mapping>
 
     <listener>
-        <listener-class>org.apache.atlas.web.listeners.GuiceServletConfig</listener-class>
+        <listener-class>org.springframework.web.util.Log4jConfigListener</listener-class>
     </listener>
-
+    
     <listener>
-        <listener-class>org.springframework.web.util.Log4jConfigListener</listener-class>
+        <listener-class>org.apache.atlas.web.listeners.GuiceServletConfig</listener-class>
     </listener>
 
     <listener>
@@ -80,4 +80,4 @@
 
 
 	
-</web-app>
\ No newline at end of file
+</web-app>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 6fd1939..683a028 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -21,11 +21,13 @@ package org.apache.atlas.notification;
 import com.google.inject.Inject;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.LocalAtlasClient;
 import org.apache.atlas.kafka.KafkaNotification;
 import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.commons.lang.RandomStringUtils;
+import org.testng.Assert;
 import org.testng.annotations.AfterTest;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Guice;
@@ -54,7 +56,6 @@ public class NotificationHookConsumerKafkaTest {
 
     @Test
     public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException {
-
         produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
 
         NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
@@ -68,7 +69,6 @@ public class NotificationHookConsumerKafkaTest {
         consumeOneMessage(consumer, hookConsumer);
         verify(localAtlasClient).setUser("test_user1");
 
-
         // produce another message, and make sure it moves ahead. If commit succeeded, this would work.
         produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity()));
         consumeOneMessage(consumer, hookConsumer);
@@ -77,10 +77,8 @@ public class NotificationHookConsumerKafkaTest {
         kafkaNotification.close();
     }
 
-    @Test
-    public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled()
-            throws NotificationException, InterruptedException {
-
+    @Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled")
+    public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception {
         produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity()));
 
         NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
@@ -114,7 +112,14 @@ public class NotificationHookConsumerKafkaTest {
         while (!consumer.hasNext()) {
             Thread.sleep(1000);
         }
-        hookConsumer.handleMessage(consumer.next());
+
+        try {
+            hookConsumer.handleMessage(consumer.next());
+        } catch (AtlasServiceException e) {
+            Assert.fail("Consumer failed with exception ", e);
+        } catch (AtlasException e) {
+            Assert.fail("Consumer failed with exception ", e);
+        }
     }
 
     Referenceable createEntity() {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index d22c5f1..f06f791 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -18,10 +18,12 @@
 package org.apache.atlas.notification;
 
 import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasException;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.LocalAtlasClient;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.typesystem.Referenceable;
 import org.apache.commons.configuration.Configuration;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
@@ -62,7 +64,7 @@ public class NotificationHookConsumerTest {
     }
 
     @Test
-    public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException {
+    public void testConsumerCanProceedIfServerIsReady() throws Exception {
         NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
         NotificationHookConsumer.HookConsumer hookConsumer =
                 notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
@@ -75,7 +77,7 @@ public class NotificationHookConsumerTest {
     }
 
     @Test
-    public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException {
+    public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception {
         NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
         NotificationHookConsumer.HookConsumer hookConsumer =
                 notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
@@ -88,7 +90,7 @@ public class NotificationHookConsumerTest {
     }
 
     @Test
-    public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException {
+    public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException {
         NotificationHookConsumer notificationHookConsumer =
                 new NotificationHookConsumer(notificationInterface, atlasClient);
         NotificationConsumer consumer = mock(NotificationConsumer.class);
@@ -104,25 +106,22 @@ public class NotificationHookConsumerTest {
     }
 
     @Test
-    public void testCommitIsCalledEvenWhenMessageProcessingFails() throws AtlasServiceException {
+    public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException {
         NotificationHookConsumer notificationHookConsumer =
                 new NotificationHookConsumer(notificationInterface, atlasClient);
         NotificationConsumer consumer = mock(NotificationConsumer.class);
         NotificationHookConsumer.HookConsumer hookConsumer =
                 notificationHookConsumer.new HookConsumer(consumer);
-        HookNotification.EntityCreateRequest message = mock(HookNotification.EntityCreateRequest.class);
-        when(message.getUser()).thenReturn("user");
-        when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE);
+        HookNotification.EntityCreateRequest message = new HookNotification.EntityCreateRequest("user", new ArrayList<Referenceable>());
         when(atlasClient.createEntity(any(List.class))).
                 thenThrow(new RuntimeException("Simulating exception in processing message"));
-
         hookConsumer.handleMessage(message);
 
-        verify(consumer).commit();
+        verifyZeroInteractions(consumer);
     }
 
     @Test
-    public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException {
+    public void testConsumerProceedsWithFalseIfInterrupted() throws Exception {
         NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
         NotificationHookConsumer.HookConsumer hookConsumer =
                 notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
@@ -134,7 +133,7 @@ public class NotificationHookConsumerTest {
     }
 
     @Test
-    public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException {
+    public void testConsumerProceedsWithFalseOnAtlasServiceException() throws Exception {
         NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
         NotificationHookConsumer.HookConsumer hookConsumer =
                 notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
@@ -146,7 +145,7 @@ public class NotificationHookConsumerTest {
     }
 
     @Test
-    public void testConsumersStartedIfHAIsDisabled() {
+    public void testConsumersStartedIfHAIsDisabled() throws Exception {
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
         List<NotificationConsumer<Object>> consumers = new ArrayList();
@@ -160,7 +159,7 @@ public class NotificationHookConsumerTest {
     }
 
     @Test
-    public void testConsumersAreNotStartedIfHAIsEnabled() {
+    public void testConsumersAreNotStartedIfHAIsEnabled() throws Exception {
         when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
@@ -174,7 +173,7 @@ public class NotificationHookConsumerTest {
     }
 
     @Test
-    public void testConsumersAreStartedWhenInstanceBecomesActive() {
+    public void testConsumersAreStartedWhenInstanceBecomesActive() throws Exception {
         when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
@@ -190,7 +189,7 @@ public class NotificationHookConsumerTest {
     }
 
     @Test
-    public void testConsumersAreStoppedWhenInstanceBecomesPassive() {
+    public void testConsumersAreStoppedWhenInstanceBecomesPassive() throws Exception {
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
         List<NotificationConsumer<Object>> consumers = new ArrayList();
@@ -201,6 +200,6 @@ public class NotificationHookConsumerTest {
         notificationHookConsumer.startInternal(configuration, executorService);
         notificationHookConsumer.instanceIsPassive();
         verify(notificationInterface).close();
-        verify(executorService).shutdownNow();
+        verify(executorService).shutdown();
     }
 }