You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2016/12/23 01:46:46 UTC
[17/50] [abbrv] incubator-atlas git commit: ATLAS-1111 Data loss is
observed when atlas is restarted while hive_table metadata ingestion into
kafka topic is in-progress(shwethags via sumasai)
ATLAS-1111 Data loss is observed when atlas is restarted while hive_table metadata ingestion into kafka topic is in-progress(shwethags via sumasai)
(cherry picked from commit 30893c5e5be5a1c3a2f104d554cc3772a6ef7b81)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/9ea1ad6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/9ea1ad6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/9ea1ad6d
Branch: refs/heads/0.7-incubating
Commit: 9ea1ad6d084b593d5ed88fd4b191c03907f49de1
Parents: f1e8906
Author: Suma Shivaprasad <su...@gmail.com>
Authored: Fri Aug 12 14:03:55 2016 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Thu Dec 22 15:17:17 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/atlas/service/Services.java | 4 +-
distro/src/conf/atlas-log4j.xml | 13 ++
.../org/apache/atlas/kafka/KafkaConsumer.java | 5 +
.../notification/NotificationConsumer.java | 2 +
.../notification/hook/HookNotification.java | 2 +-
.../atlas/kafka/KafkaNotificationMockTest.java | 198 ++++++++++++++++
.../atlas/kafka/KafkaNotificationTest.java | 233 ++++++-------------
.../AbstractNotificationConsumerTest.java | 5 +
.../classloader/AtlasPluginClassLoader.java | 32 +--
release-log.txt | 1 +
.../atlas/GraphTransactionInterceptor.java | 2 +-
typesystem/src/main/resources/atlas-log4j.xml | 13 ++
.../test/resources/atlas-application.properties | 4 +-
.../notification/NotificationHookConsumer.java | 113 ++++++---
.../atlas/web/listeners/GuiceServletConfig.java | 13 +-
webapp/src/main/webapp/WEB-INF/web.xml | 8 +-
.../NotificationHookConsumerKafkaTest.java | 19 +-
.../NotificationHookConsumerTest.java | 31 ++-
18 files changed, 444 insertions(+), 254 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/common/src/main/java/org/apache/atlas/service/Services.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/service/Services.java b/common/src/main/java/org/apache/atlas/service/Services.java
index 8b2e205..588dd8e 100644
--- a/common/src/main/java/org/apache/atlas/service/Services.java
+++ b/common/src/main/java/org/apache/atlas/service/Services.java
@@ -41,7 +41,7 @@ public class Services {
public void start() {
try {
for (Service service : services) {
- LOG.debug("Starting service {}", service.getClass().getName());
+ LOG.info("Starting service {}", service.getClass().getName());
service.start();
}
} catch (Exception e) {
@@ -51,7 +51,7 @@ public class Services {
public void stop() {
for (Service service : services) {
- LOG.debug("Stopping service {}", service.getClass().getName());
+ LOG.info("Stopping service {}", service.getClass().getName());
try {
service.stop();
} catch (Throwable e) {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/distro/src/conf/atlas-log4j.xml
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-log4j.xml b/distro/src/conf/atlas-log4j.xml
index eaa4ec5..400cd3a 100755
--- a/distro/src/conf/atlas-log4j.xml
+++ b/distro/src/conf/atlas-log4j.xml
@@ -43,6 +43,14 @@
</layout>
</appender>
+ <appender name="FAILED" class="org.apache.log4j.DailyRollingFileAppender">
+ <param name="File" value="${atlas.log.dir}/failed.log"/>
+ <param name="Append" value="true"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %m"/>
+ </layout>
+ </appender>
+
<logger name="org.apache.atlas" additivity="false">
<level value="info"/>
<appender-ref ref="FILE"/>
@@ -80,6 +88,11 @@
<appender-ref ref="AUDIT"/>
</logger>
+ <logger name="FAILED" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="AUDIT"/>
+ </logger>
+
<root>
<priority value="warn"/>
<appender-ref ref="FILE"/>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
index 270215d..16c0eb2 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
@@ -96,4 +96,9 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> {
LOG.debug("Committed offset: {}", lastSeenOffset);
}
}
+
+ @Override
+ public void close() {
+ consumerConnector.shutdown();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index 2e861cb..a99cb10 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -52,4 +52,6 @@ public interface NotificationConsumer<T> {
* restart.
*/
void commit();
+
+ void close();
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
index 88a0322..a25aa52 100644
--- a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
@@ -156,7 +156,7 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
}
}
- public List<Referenceable> getEntities() throws JSONException {
+ public List<Referenceable> getEntities() {
return entities;
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
new file mode 100644
index 0000000..2126be6
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.kafka;
+
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.serializer.StringDecoder;
+import org.apache.atlas.notification.MessageDeserializer;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationException;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class KafkaNotificationMockTest {
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCreateConsumers() throws Exception {
+ Properties properties = mock(Properties.class);
+ when(properties.getProperty("entities.group.id")).thenReturn("atlas");
+ final ConsumerConnector consumerConnector = mock(ConsumerConnector.class);
+ Map<String, Integer> topicCountMap = new HashMap<>();
+ topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 1);
+
+ Map<String, List<KafkaStream<String, String>>> kafkaStreamsMap =
+ new HashMap<>();
+ List<KafkaStream<String, String>> kafkaStreams = new ArrayList<>();
+ KafkaStream kafkaStream = mock(KafkaStream.class);
+ kafkaStreams.add(kafkaStream);
+ kafkaStreamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, kafkaStreams);
+
+ when(consumerConnector.createMessageStreams(
+ eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class))).thenReturn(kafkaStreamsMap);
+
+ final KafkaConsumer consumer1 = mock(KafkaConsumer.class);
+ final KafkaConsumer consumer2 = mock(KafkaConsumer.class);
+
+ KafkaNotification kafkaNotification =
+ new TestKafkaNotification(properties, consumerConnector, consumer1, consumer2);
+
+ List<NotificationConsumer<String>> consumers =
+ kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 2);
+
+ verify(consumerConnector, times(2)).createMessageStreams(
+ eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class));
+ assertEquals(consumers.size(), 2);
+ assertTrue(consumers.contains(consumer1));
+ assertTrue(consumers.contains(consumer2));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldSendMessagesSuccessfully() throws NotificationException,
+ ExecutionException, InterruptedException {
+ Properties configProperties = mock(Properties.class);
+ KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
+
+ Producer producer = mock(Producer.class);
+ String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
+ String message = "This is a test message";
+ Future returnValue = mock(Future.class);
+ when(returnValue.get()).thenReturn(new RecordMetadata(new TopicPartition(topicName, 0), 0, 0));
+ ProducerRecord expectedRecord = new ProducerRecord(topicName, message);
+ when(producer.send(expectedRecord)).thenReturn(returnValue);
+
+ kafkaNotification.sendInternalToProducer(producer,
+ NotificationInterface.NotificationType.HOOK, new String[]{message});
+
+ verify(producer).send(expectedRecord);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldThrowExceptionIfProducerFails() throws NotificationException,
+ ExecutionException, InterruptedException {
+ Properties configProperties = mock(Properties.class);
+ KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
+
+ Producer producer = mock(Producer.class);
+ String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
+ String message = "This is a test message";
+ Future returnValue = mock(Future.class);
+ when(returnValue.get()).thenThrow(new RuntimeException("Simulating exception"));
+ ProducerRecord expectedRecord = new ProducerRecord(topicName, message);
+ when(producer.send(expectedRecord)).thenReturn(returnValue);
+
+ try {
+ kafkaNotification.sendInternalToProducer(producer,
+ NotificationInterface.NotificationType.HOOK, new String[]{message});
+ fail("Should have thrown NotificationException");
+ } catch (NotificationException e) {
+ assertEquals(e.getFailedMessages().size(), 1);
+ assertEquals(e.getFailedMessages().get(0), "This is a test message");
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldCollectAllFailedMessagesIfProducerFails() throws NotificationException,
+ ExecutionException, InterruptedException {
+ Properties configProperties = mock(Properties.class);
+ KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
+
+ Producer producer = mock(Producer.class);
+ String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
+ String message1 = "This is a test message1";
+ String message2 = "This is a test message2";
+ Future returnValue1 = mock(Future.class);
+ when(returnValue1.get()).thenThrow(new RuntimeException("Simulating exception"));
+ Future returnValue2 = mock(Future.class);
+ when(returnValue2.get()).thenThrow(new RuntimeException("Simulating exception"));
+ ProducerRecord expectedRecord1 = new ProducerRecord(topicName, message1);
+ when(producer.send(expectedRecord1)).thenReturn(returnValue1);
+ ProducerRecord expectedRecord2 = new ProducerRecord(topicName, message2);
+ when(producer.send(expectedRecord2)).thenReturn(returnValue1);
+
+ try {
+ kafkaNotification.sendInternalToProducer(producer,
+ NotificationInterface.NotificationType.HOOK, new String[]{message1, message2});
+ fail("Should have thrown NotificationException");
+ } catch (NotificationException e) {
+ assertEquals(e.getFailedMessages().size(), 2);
+ assertEquals(e.getFailedMessages().get(0), "This is a test message1");
+ assertEquals(e.getFailedMessages().get(1), "This is a test message2");
+ }
+ }
+
+ class TestKafkaNotification extends KafkaNotification {
+
+ private final ConsumerConnector consumerConnector;
+ private final KafkaConsumer consumer1;
+ private final KafkaConsumer consumer2;
+
+ TestKafkaNotification(Properties properties, ConsumerConnector consumerConnector,
+ KafkaConsumer consumer1, KafkaConsumer consumer2) {
+ super(properties);
+ this.consumerConnector = consumerConnector;
+ this.consumer1 = consumer1;
+ this.consumer2 = consumer2;
+ }
+
+ @Override
+ protected ConsumerConnector createConsumerConnector(Properties consumerProperties) {
+ return consumerConnector;
+ }
+
+ @Override
+ protected <T> org.apache.atlas.kafka.KafkaConsumer<T>
+ createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer, KafkaStream stream,
+ int consumerId, ConsumerConnector connector, boolean autoCommitEnabled) {
+ if (consumerId == 0) {
+ return consumer1;
+ } else if (consumerId == 1) {
+ return consumer2;
+ }
+ return null;
+ }
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index 2a49634..a810029 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -15,184 +15,93 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.atlas.kafka;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.serializer.StringDecoder;
-import org.apache.atlas.notification.MessageDeserializer;
+import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.notification.NotificationConsumer;
-import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.RandomStringUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
public class KafkaNotificationTest {
- @Test
- @SuppressWarnings("unchecked")
- public void testCreateConsumers() throws Exception {
- Properties properties = mock(Properties.class);
- when(properties.getProperty("entities.group.id")).thenReturn("atlas");
- final ConsumerConnector consumerConnector = mock(ConsumerConnector.class);
- Map<String, Integer> topicCountMap = new HashMap<>();
- topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 1);
-
- Map<String, List<KafkaStream<String, String>>> kafkaStreamsMap =
- new HashMap<>();
- List<KafkaStream<String, String>> kafkaStreams = new ArrayList<>();
- KafkaStream kafkaStream = mock(KafkaStream.class);
- kafkaStreams.add(kafkaStream);
- kafkaStreamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, kafkaStreams);
-
- when(consumerConnector.createMessageStreams(
- eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class))).thenReturn(kafkaStreamsMap);
-
- final KafkaConsumer consumer1 = mock(KafkaConsumer.class);
- final KafkaConsumer consumer2 = mock(KafkaConsumer.class);
-
- KafkaNotification kafkaNotification =
- new TestKafkaNotification(properties, consumerConnector, consumer1, consumer2);
-
- List<NotificationConsumer<String>> consumers =
- kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 2);
-
- verify(consumerConnector, times(2)).createMessageStreams(
- eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class));
- assertEquals(consumers.size(), 2);
- assertTrue(consumers.contains(consumer1));
- assertTrue(consumers.contains(consumer2));
- }
+ private KafkaNotification kafkaNotification;
- @Test
- @SuppressWarnings("unchecked")
- public void shouldSendMessagesSuccessfully() throws NotificationException,
- ExecutionException, InterruptedException {
- Properties configProperties = mock(Properties.class);
- KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
-
- Producer producer = mock(Producer.class);
- String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
- String message = "This is a test message";
- Future returnValue = mock(Future.class);
- when(returnValue.get()).thenReturn(new RecordMetadata(new TopicPartition(topicName, 0), 0, 0));
- ProducerRecord expectedRecord = new ProducerRecord(topicName, message);
- when(producer.send(expectedRecord)).thenReturn(returnValue);
-
- kafkaNotification.sendInternalToProducer(producer,
- NotificationInterface.NotificationType.HOOK, new String[]{message});
-
- verify(producer).send(expectedRecord);
- }
+ @BeforeClass
+ public void setup() throws Exception {
+ Configuration properties = ApplicationProperties.get();
+ properties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5));
- @Test
- @SuppressWarnings("unchecked")
- public void shouldThrowExceptionIfProducerFails() throws NotificationException,
- ExecutionException, InterruptedException {
- Properties configProperties = mock(Properties.class);
- KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
-
- Producer producer = mock(Producer.class);
- String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
- String message = "This is a test message";
- Future returnValue = mock(Future.class);
- when(returnValue.get()).thenThrow(new RuntimeException("Simulating exception"));
- ProducerRecord expectedRecord = new ProducerRecord(topicName, message);
- when(producer.send(expectedRecord)).thenReturn(returnValue);
-
- try {
- kafkaNotification.sendInternalToProducer(producer,
- NotificationInterface.NotificationType.HOOK, new String[]{message});
- fail("Should have thrown NotificationException");
- } catch (NotificationException e) {
- assertEquals(e.getFailedMessages().size(), 1);
- assertEquals(e.getFailedMessages().get(0), "This is a test message");
- }
+ kafkaNotification = new KafkaNotification(properties);
+ kafkaNotification.start();
}
- @Test
- @SuppressWarnings("unchecked")
- public void shouldCollectAllFailedMessagesIfProducerFails() throws NotificationException,
- ExecutionException, InterruptedException {
- Properties configProperties = mock(Properties.class);
- KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
-
- Producer producer = mock(Producer.class);
- String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
- String message1 = "This is a test message1";
- String message2 = "This is a test message2";
- Future returnValue1 = mock(Future.class);
- when(returnValue1.get()).thenThrow(new RuntimeException("Simulating exception"));
- Future returnValue2 = mock(Future.class);
- when(returnValue2.get()).thenThrow(new RuntimeException("Simulating exception"));
- ProducerRecord expectedRecord1 = new ProducerRecord(topicName, message1);
- when(producer.send(expectedRecord1)).thenReturn(returnValue1);
- ProducerRecord expectedRecord2 = new ProducerRecord(topicName, message2);
- when(producer.send(expectedRecord2)).thenReturn(returnValue1);
-
- try {
- kafkaNotification.sendInternalToProducer(producer,
- NotificationInterface.NotificationType.HOOK, new String[]{message1, message2});
- fail("Should have thrown NotificationException");
- } catch (NotificationException e) {
- assertEquals(e.getFailedMessages().size(), 2);
- assertEquals(e.getFailedMessages().get(0), "This is a test message1");
- assertEquals(e.getFailedMessages().get(1), "This is a test message2");
- }
+ @AfterClass
+ public void shutdown() throws Exception {
+ kafkaNotification.close();
+ kafkaNotification.stop();
}
- class TestKafkaNotification extends KafkaNotification {
-
- private final ConsumerConnector consumerConnector;
- private final KafkaConsumer consumer1;
- private final KafkaConsumer consumer2;
-
- TestKafkaNotification(Properties properties, ConsumerConnector consumerConnector,
- KafkaConsumer consumer1, KafkaConsumer consumer2) {
- super(properties);
- this.consumerConnector = consumerConnector;
- this.consumer1 = consumer1;
- this.consumer2 = consumer2;
- }
-
- @Override
- protected ConsumerConnector createConsumerConnector(Properties consumerProperties) {
- return consumerConnector;
- }
-
- @Override
- protected <T> org.apache.atlas.kafka.KafkaConsumer<T>
- createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer, KafkaStream stream,
- int consumerId, ConsumerConnector connector, boolean autoCommitEnabled) {
- if (consumerId == 0) {
- return consumer1;
- } else if (consumerId == 1) {
- return consumer2;
- }
- return null;
- }
-
-
+ @Test
+ public void testNext() throws Exception {
+ kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
+ new HookNotification.EntityCreateRequest("u1", new Referenceable("type")));
+ kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
+ new HookNotification.EntityCreateRequest("u2", new Referenceable("type")));
+ kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
+ new HookNotification.EntityCreateRequest("u3", new Referenceable("type")));
+ kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
+ new HookNotification.EntityCreateRequest("u4", new Referenceable("type")));
+
+ NotificationConsumer<Object> consumer =
+ kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
+ assertTrue(consumer.hasNext());
+ HookNotification.HookNotificationMessage message = (HookNotification.HookNotificationMessage) consumer.next();
+ assertEquals(message.getUser(), "u1");
+
+ assertTrue(consumer.hasNext());
+ message = (HookNotification.HookNotificationMessage) consumer.next();
+ assertEquals(message.getUser(), "u2");
+ consumer.close();
+
+ //nothing committed(even though u1 and u2 are read), now should restart from u1
+ consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
+ assertTrue(consumer.hasNext());
+ message = (HookNotification.HookNotificationMessage) consumer.next();
+ assertEquals(message.getUser(), "u1");
+ consumer.commit();
+
+ assertTrue(consumer.hasNext());
+ message = (HookNotification.HookNotificationMessage) consumer.next();
+ assertEquals(message.getUser(), "u2");
+ consumer.close();
+
+ //u1 committed, u2 read, should start from u2
+ consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
+ assertTrue(consumer.hasNext());
+ message = (HookNotification.HookNotificationMessage) consumer.next();
+ assertEquals(message.getUser(), "u2");
+
+ assertTrue(consumer.hasNext());
+ message = (HookNotification.HookNotificationMessage) consumer.next();
+ assertEquals(message.getUser(), "u3");
+ consumer.commit();
+ consumer.close();
+
+ //u2, u3 read, but only u3 committed, should start from u4
+ consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
+ assertTrue(consumer.hasNext());
+ message = (HookNotification.HookNotificationMessage) consumer.next();
+ assertEquals(message.getUser(), "u4");
+ consumer.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index 0c8990f..ed5b9fc 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -262,6 +262,11 @@ public class AbstractNotificationConsumerTest {
public void commit() {
// do nothing.
}
+
+ @Override
+ public void close() {
+ //do nothing
+ }
}
private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/plugin-classloader/src/main/java/org/apache/atlas/plugin/classloader/AtlasPluginClassLoader.java
----------------------------------------------------------------------
diff --git a/plugin-classloader/src/main/java/org/apache/atlas/plugin/classloader/AtlasPluginClassLoader.java b/plugin-classloader/src/main/java/org/apache/atlas/plugin/classloader/AtlasPluginClassLoader.java
index 92cc406..0dbf352 100644
--- a/plugin-classloader/src/main/java/org/apache/atlas/plugin/classloader/AtlasPluginClassLoader.java
+++ b/plugin-classloader/src/main/java/org/apache/atlas/plugin/classloader/AtlasPluginClassLoader.java
@@ -75,16 +75,16 @@ public final class AtlasPluginClassLoader extends URLClassLoader {
@Override
public Class<?> findClass(String name) throws ClassNotFoundException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasPluginClassLoader.findClass(" + name + ")");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("==> AtlasPluginClassLoader.findClass(" + name + ")");
}
Class<?> ret = null;
try {
// first try to find the class in pluginClassloader
- if (LOG.isDebugEnabled()) {
- LOG.debug("AtlasPluginClassLoader.findClass(" + name + "): calling pluginClassLoader.findClass()");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("AtlasPluginClassLoader.findClass(" + name + "): calling pluginClassLoader.findClass()");
}
ret = super.findClass(name);
@@ -93,8 +93,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader {
MyClassLoader savedClassLoader = getComponentClassLoader();
if (savedClassLoader != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
"AtlasPluginClassLoader.findClass(" + name + "): calling componentClassLoader.findClass()");
}
@@ -102,8 +102,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader {
}
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasPluginClassLoader.findClass(" + name + "): " + ret);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("<== AtlasPluginClassLoader.findClass(" + name + "): " + ret);
}
return ret;
@@ -111,16 +111,16 @@ public final class AtlasPluginClassLoader extends URLClassLoader {
@Override
public Class<?> loadClass(String name) throws ClassNotFoundException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasPluginClassLoader.loadClass(" + name + ")");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("==> AtlasPluginClassLoader.loadClass(" + name + ")");
}
Class<?> ret = null;
try {
// first try to load the class from pluginClassloader
- if (LOG.isDebugEnabled()) {
- LOG.debug("AtlasPluginClassLoader.loadClass(" + name + "): calling pluginClassLoader.loadClass()");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("AtlasPluginClassLoader.loadClass(" + name + "): calling pluginClassLoader.loadClass()");
}
ret = super.loadClass(name);
@@ -129,8 +129,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader {
MyClassLoader savedClassLoader = getComponentClassLoader();
if (savedClassLoader != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
"AtlasPluginClassLoader.loadClass(" + name + "): calling componentClassLoader.loadClass()");
}
@@ -138,8 +138,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader {
}
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasPluginClassLoader.loadClass(" + name + "): " + ret);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("<== AtlasPluginClassLoader.loadClass(" + name + "): " + ret);
}
return ret;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index a2fb2c0..c55dffd 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -28,6 +28,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
+ATLAS-1111 Data loss is observed when atlas is restarted while hive_table metadata ingestion into kafka topic is in-progress(shwethags via sumasai)
ATLAS-1108: updated references to atlas.rest.address to handle multiple URLs
ATLAS-1112 Hive table GET response from atlas server had duplicate column entries ( ayubkhan, mneethiraj via sumasai)
ATLAS-1108 In Atlas HA mode , import-hive.sh in Passive instance fails. (ayubkhan via sumasai)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
index b9689f4..20e8ebc 100644
--- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
+++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
@@ -41,7 +41,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
try {
Object response = invocation.proceed();
titanGraph.commit();
- LOG.debug("graph commit");
+ LOG.info("graph commit");
return response;
} catch (Throwable t) {
titanGraph.rollback();
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/typesystem/src/main/resources/atlas-log4j.xml
----------------------------------------------------------------------
diff --git a/typesystem/src/main/resources/atlas-log4j.xml b/typesystem/src/main/resources/atlas-log4j.xml
index 0f7573e..5a48854 100755
--- a/typesystem/src/main/resources/atlas-log4j.xml
+++ b/typesystem/src/main/resources/atlas-log4j.xml
@@ -57,6 +57,19 @@
</logger>
-->
+ <appender name="FAILED" class="org.apache.log4j.DailyRollingFileAppender">
+ <param name="File" value="${atlas.log.dir}/failed.log"/>
+ <param name="Append" value="true"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %m"/>
+ </layout>
+ </appender>
+
+ <logger name="FAILED" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="AUDIT"/>
+ </logger>
+
<logger name="com.thinkaurelius.titan" additivity="false">
<level value="info"/>
<appender-ref ref="console"/>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/typesystem/src/test/resources/atlas-application.properties
----------------------------------------------------------------------
diff --git a/typesystem/src/test/resources/atlas-application.properties b/typesystem/src/test/resources/atlas-application.properties
index a3b6c90..fb31462 100644
--- a/typesystem/src/test/resources/atlas-application.properties
+++ b/typesystem/src/test/resources/atlas-application.properties
@@ -77,7 +77,7 @@ atlas.kafka.bootstrap.servers=localhost:19027
atlas.kafka.data=${sys:atlas.data}/kafka
atlas.kafka.zookeeper.session.timeout.ms=4000
atlas.kafka.zookeeper.sync.time.ms=20
-atlas.kafka.consumer.timeout.ms=100
+atlas.kafka.consumer.timeout.ms=4000
atlas.kafka.auto.commit.interval.ms=100
atlas.kafka.hook.group.id=atlas
atlas.kafka.entities.group.id=atlas_entities
@@ -122,4 +122,4 @@ atlas.auth.policy.file=${sys:user.dir}/distro/src/conf/policy-store.txt
atlas.authentication.method.file=true
atlas.authentication.method.ldap.type=none
atlas.authentication.method.file.filename=${sys:user.dir}/distro/src/conf/users-credentials.properties
-atlas.authentication.method.kerberos=false
\ No newline at end of file
+atlas.authentication.method.kerberos=false
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 901b1ed..6b1f3f2 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -24,6 +24,7 @@ import com.google.inject.Singleton;
import kafka.consumer.ConsumerTimeoutException;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.LocalAtlasClient;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
@@ -46,11 +47,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
@Singleton
public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
+ private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
+
private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
+ public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
+ public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize";
+
public static final int SERVER_READY_WAIT_TIME_MS = 1000;
private final LocalAtlasClient atlasClient;
+ private final int maxRetries;
+ private final int failedMsgCacheSize;
private NotificationInterface notificationInterface;
private ExecutorService executors;
@@ -58,20 +66,23 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private List<HookConsumer> consumers;
@Inject
- public NotificationHookConsumer(NotificationInterface notificationInterface, LocalAtlasClient atlasClient) {
+ public NotificationHookConsumer(NotificationInterface notificationInterface, LocalAtlasClient atlasClient)
+ throws AtlasException {
this.notificationInterface = notificationInterface;
this.atlasClient = atlasClient;
+ this.applicationProperties = ApplicationProperties.get();
+
+ maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
+ failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20);
+
}
@Override
public void start() throws AtlasException {
- Configuration configuration = ApplicationProperties.get();
- startInternal(configuration, null);
+ startInternal(applicationProperties, null);
}
- void startInternal(Configuration configuration,
- ExecutorService executorService) {
- this.applicationProperties = configuration;
+ void startInternal(Configuration configuration, ExecutorService executorService) {
if (consumers == null) {
consumers = new ArrayList<>();
}
@@ -103,16 +114,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@Override
public void stop() {
//Allow for completion of outstanding work
- notificationInterface.close();
try {
+ stopConsumerThreads();
if (executors != null) {
- stopConsumerThreads();
- executors.shutdownNow();
+ executors.shutdown();
if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
executors = null;
}
+ notificationInterface.close();
} catch (InterruptedException e) {
LOG.error("Failure in shutting down consumers");
}
@@ -160,6 +171,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
class HookConsumer implements Runnable {
private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
private final AtomicBoolean shouldRun = new AtomicBoolean(false);
+ private List<HookNotification.HookNotificationMessage> failedMessages = new ArrayList<>();
public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
this.consumer = consumer;
@@ -193,45 +205,71 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
@VisibleForTesting
- void handleMessage(HookNotification.HookNotificationMessage message) {
- atlasClient.setUser(message.getUser());
- try {
- switch (message.getType()) {
- case ENTITY_CREATE:
- HookNotification.EntityCreateRequest createRequest =
+ void handleMessage(HookNotification.HookNotificationMessage message) throws
+ AtlasServiceException, AtlasException {
+ for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
+ LOG.debug("Running attempt {}", numRetries);
+ try {
+ atlasClient.setUser(message.getUser());
+ switch (message.getType()) {
+ case ENTITY_CREATE:
+ HookNotification.EntityCreateRequest createRequest =
(HookNotification.EntityCreateRequest) message;
- atlasClient.createEntity(createRequest.getEntities());
- break;
+ atlasClient.createEntity(createRequest.getEntities());
+ break;
- case ENTITY_PARTIAL_UPDATE:
- HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
+ case ENTITY_PARTIAL_UPDATE:
+ HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
(HookNotification.EntityPartialUpdateRequest) message;
- atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
+ atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
partialUpdateRequest.getAttribute(),
partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity());
- break;
+ break;
- case ENTITY_DELETE:
- HookNotification.EntityDeleteRequest deleteRequest =
- (HookNotification.EntityDeleteRequest) message;
- atlasClient.deleteEntity(deleteRequest.getTypeName(),
- deleteRequest.getAttribute(),
- deleteRequest.getAttributeValue());
- break;
+ case ENTITY_DELETE:
+ HookNotification.EntityDeleteRequest deleteRequest =
+ (HookNotification.EntityDeleteRequest) message;
+ atlasClient.deleteEntity(deleteRequest.getTypeName(),
+ deleteRequest.getAttribute(),
+ deleteRequest.getAttributeValue());
+ break;
- case ENTITY_FULL_UPDATE:
- HookNotification.EntityUpdateRequest updateRequest =
+ case ENTITY_FULL_UPDATE:
+ HookNotification.EntityUpdateRequest updateRequest =
(HookNotification.EntityUpdateRequest) message;
- atlasClient.updateEntities(updateRequest.getEntities());
- break;
+ atlasClient.updateEntities(updateRequest.getEntities());
+ break;
- default:
- throw new IllegalStateException("Unhandled exception!");
+ default:
+ throw new IllegalStateException("Unhandled exception!");
+ }
+
+ break;
+ } catch (Throwable e) {
+ LOG.warn("Error handling message", e);
+ if (numRetries == (maxRetries - 1)) {
+ LOG.warn("Max retries exceeded for message {}", message, e);
+ failedMessages.add(message);
+ if (failedMessages.size() >= failedMsgCacheSize) {
+ recordFailedMessages();
+ }
+ return;
+ }
}
- } catch (Exception e) {
- //todo handle failures
- LOG.warn("Error handling message {}", message, e);
}
+ commit();
+ }
+
+ private void recordFailedMessages() {
+ //logging failed messages
+ for (HookNotification.HookNotificationMessage message : failedMessages) {
+ FAILED_LOG.error("[DROPPED_NOTIFICATION] " + AbstractNotification.getMessageJson(message));
+ }
+ failedMessages.clear();
+ }
+
+ private void commit() {
+ recordFailedMessages();
consumer.commit();
}
@@ -260,6 +298,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public void stop() {
shouldRun.set(false);
+ consumer.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
index 0a7c5df..a1d3187 100755
--- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
+++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
@@ -155,8 +155,11 @@ public class GuiceServletConfig extends GuiceServletContextListener {
@Override
public void contextDestroyed(ServletContextEvent servletContextEvent) {
- super.contextDestroyed(servletContextEvent);
+ LOG.info("Starting servlet context destroy");
if(injector != null) {
+ //stop services
+ stopServices();
+
TypeLiteral<GraphProvider<TitanGraph>> graphProviderType = new TypeLiteral<GraphProvider<TitanGraph>>() {};
Provider<GraphProvider<TitanGraph>> graphProvider = injector.getProvider(Key.get(graphProviderType));
final Graph graph = graphProvider.get().get();
@@ -166,15 +169,13 @@ public class GuiceServletConfig extends GuiceServletContextListener {
} catch(Throwable t) {
LOG.warn("Error while shutting down graph", t);
}
-
- //stop services
- stopServices();
}
+ super.contextDestroyed(servletContextEvent);
}
protected void stopServices() {
- LOG.debug("Stopping services");
+ LOG.info("Stopping services");
Services services = injector.getInstance(Services.class);
services.stop();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/webapp/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/webapp/src/main/webapp/WEB-INF/web.xml b/webapp/src/main/webapp/WEB-INF/web.xml
index 34b6856..2e36b94 100755
--- a/webapp/src/main/webapp/WEB-INF/web.xml
+++ b/webapp/src/main/webapp/WEB-INF/web.xml
@@ -63,11 +63,11 @@
</filter-mapping>
<listener>
- <listener-class>org.apache.atlas.web.listeners.GuiceServletConfig</listener-class>
+ <listener-class>org.springframework.web.util.Log4jConfigListener</listener-class>
</listener>
-
+
<listener>
- <listener-class>org.springframework.web.util.Log4jConfigListener</listener-class>
+ <listener-class>org.apache.atlas.web.listeners.GuiceServletConfig</listener-class>
</listener>
<listener>
@@ -80,4 +80,4 @@
-</web-app>
\ No newline at end of file
+</web-app>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 6fd1939..683a028 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -21,11 +21,13 @@ package org.apache.atlas.notification;
import com.google.inject.Inject;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.LocalAtlasClient;
import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.lang.RandomStringUtils;
+import org.testng.Assert;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Guice;
@@ -54,7 +56,6 @@ public class NotificationHookConsumerKafkaTest {
@Test
public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException {
-
produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
@@ -68,7 +69,6 @@ public class NotificationHookConsumerKafkaTest {
consumeOneMessage(consumer, hookConsumer);
verify(localAtlasClient).setUser("test_user1");
-
// produce another message, and make sure it moves ahead. If commit succeeded, this would work.
produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity()));
consumeOneMessage(consumer, hookConsumer);
@@ -77,10 +77,8 @@ public class NotificationHookConsumerKafkaTest {
kafkaNotification.close();
}
- @Test
- public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled()
- throws NotificationException, InterruptedException {
-
+ @Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled")
+ public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception {
produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity()));
NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
@@ -114,7 +112,14 @@ public class NotificationHookConsumerKafkaTest {
while (!consumer.hasNext()) {
Thread.sleep(1000);
}
- hookConsumer.handleMessage(consumer.next());
+
+ try {
+ hookConsumer.handleMessage(consumer.next());
+ } catch (AtlasServiceException e) {
+ Assert.fail("Consumer failed with exception ", e);
+ } catch (AtlasException e) {
+ Assert.fail("Consumer failed with exception ", e);
+ }
}
Referenceable createEntity() {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9ea1ad6d/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index d22c5f1..f06f791 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -18,10 +18,12 @@
package org.apache.atlas.notification;
import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.LocalAtlasClient;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.configuration.Configuration;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@@ -62,7 +64,7 @@ public class NotificationHookConsumerTest {
}
@Test
- public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException {
+ public void testConsumerCanProceedIfServerIsReady() throws Exception {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
@@ -75,7 +77,7 @@ public class NotificationHookConsumerTest {
}
@Test
- public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException {
+ public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
@@ -88,7 +90,7 @@ public class NotificationHookConsumerTest {
}
@Test
- public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException {
+ public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException {
NotificationHookConsumer notificationHookConsumer =
new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationConsumer consumer = mock(NotificationConsumer.class);
@@ -104,25 +106,22 @@ public class NotificationHookConsumerTest {
}
@Test
- public void testCommitIsCalledEvenWhenMessageProcessingFails() throws AtlasServiceException {
+ public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException {
NotificationHookConsumer notificationHookConsumer =
new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationConsumer consumer = mock(NotificationConsumer.class);
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(consumer);
- HookNotification.EntityCreateRequest message = mock(HookNotification.EntityCreateRequest.class);
- when(message.getUser()).thenReturn("user");
- when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE);
+ HookNotification.EntityCreateRequest message = new HookNotification.EntityCreateRequest("user", new ArrayList<Referenceable>());
when(atlasClient.createEntity(any(List.class))).
thenThrow(new RuntimeException("Simulating exception in processing message"));
-
hookConsumer.handleMessage(message);
- verify(consumer).commit();
+ verifyZeroInteractions(consumer);
}
@Test
- public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException {
+ public void testConsumerProceedsWithFalseIfInterrupted() throws Exception {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
@@ -134,7 +133,7 @@ public class NotificationHookConsumerTest {
}
@Test
- public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException {
+ public void testConsumerProceedsWithFalseOnAtlasServiceException() throws Exception {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
@@ -146,7 +145,7 @@ public class NotificationHookConsumerTest {
}
@Test
- public void testConsumersStartedIfHAIsDisabled() {
+ public void testConsumersStartedIfHAIsDisabled() throws Exception {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
List<NotificationConsumer<Object>> consumers = new ArrayList();
@@ -160,7 +159,7 @@ public class NotificationHookConsumerTest {
}
@Test
- public void testConsumersAreNotStartedIfHAIsEnabled() {
+ public void testConsumersAreNotStartedIfHAIsEnabled() throws Exception {
when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
@@ -174,7 +173,7 @@ public class NotificationHookConsumerTest {
}
@Test
- public void testConsumersAreStartedWhenInstanceBecomesActive() {
+ public void testConsumersAreStartedWhenInstanceBecomesActive() throws Exception {
when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
@@ -190,7 +189,7 @@ public class NotificationHookConsumerTest {
}
@Test
- public void testConsumersAreStoppedWhenInstanceBecomesPassive() {
+ public void testConsumersAreStoppedWhenInstanceBecomesPassive() throws Exception {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
List<NotificationConsumer<Object>> consumers = new ArrayList();
@@ -201,6 +200,6 @@ public class NotificationHookConsumerTest {
notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsPassive();
verify(notificationInterface).close();
- verify(executorService).shutdownNow();
+ verify(executorService).shutdown();
}
}