You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2017/08/10 16:55:04 UTC
atlas git commit: ATLAS-2033: HookConsumer updated to address case
where consumer is stopped before starting. Updated unit tests.
Repository: atlas
Updated Branches:
refs/heads/master 02a6e4771 -> 6f9684b4f
ATLAS-2033: HookConsumer updated to address case where consumer is stopped before starting. Updated unit tests.
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/6f9684b4
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/6f9684b4
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/6f9684b4
Branch: refs/heads/master
Commit: 6f9684b4fb0a1c96993df900305d0c45c9a4e32f
Parents: 02a6e47
Author: ashutoshm <am...@hortonworks.com>
Authored: Thu Aug 10 09:54:38 2017 -0700
Committer: ashutoshm <am...@hortonworks.com>
Committed: Thu Aug 10 09:54:38 2017 -0700
----------------------------------------------------------------------
.../notification/NotificationHookConsumer.java | 18 +++++--
.../NotificationHookConsumerTest.java | 53 ++++++++++++++++----
2 files changed, 57 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/6f9684b4/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 5a982bb..ef64c3b 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -97,7 +97,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private NotificationInterface notificationInterface;
private ExecutorService executors;
private Configuration applicationProperties;
- private List<HookConsumer> consumers;
+
+ @VisibleForTesting
+ List<HookConsumer> consumers;
@Inject
public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore,
@@ -212,6 +214,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
+ @VisibleForTesting
class HookConsumer extends ShutdownableThread {
private final NotificationConsumer<HookNotificationMessage> consumer;
private final AtomicBoolean shouldRun = new AtomicBoolean(false);
@@ -419,6 +422,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public void shutdown() {
LOG.info("==> HookConsumer shutdown()");
+ // handle the case where thread was not started at all
+ // and shutdown called
+ if(shouldRun.get() == false) {
+ return;
+ }
+
super.initiateShutdown();
shouldRun.set(false);
if (consumer != null) {
@@ -428,7 +437,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
LOG.info("<== HookConsumer shutdown()");
}
-
}
private void audit(String messageUser, String method, String path) {
http://git-wip-us.apache.org/repos/asf/atlas/blob/6f9684b4/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index f4ec56a..a6f58e8 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -33,11 +33,14 @@ import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.configuration.Configuration;
+import org.apache.kafka.common.TopicPartition;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import org.apache.kafka.common.TopicPartition;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -48,7 +51,6 @@ import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;
public class NotificationHookConsumerTest {
-
@Mock
private NotificationInterface notificationInterface;
@@ -126,7 +128,7 @@ public class NotificationHookConsumerTest {
when(message.getEntities()).thenReturn(Arrays.asList(mock));
hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
- verify(consumer).commit(any(TopicPartition.class),anyInt());
+ verify(consumer).commit(any(TopicPartition.class), anyInt());
}
@Test
@@ -138,8 +140,10 @@ public class NotificationHookConsumerTest {
notificationHookConsumer.new HookConsumer(consumer);
HookNotification.EntityCreateRequest message = new HookNotification.EntityCreateRequest("user",
new ArrayList<Referenceable>() {
- { add(mock(Referenceable.class)); }
- });
+ {
+ add(mock(Referenceable.class));
+ }
+ });
when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenThrow(new RuntimeException("Simulating exception in processing message"));
hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
@@ -204,13 +208,44 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumersAreStoppedWhenInstanceBecomesPassive() throws Exception {
+ when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
List<NotificationConsumer<Object>> consumers = new ArrayList();
- consumers.add(mock(NotificationConsumer.class));
- when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
- thenReturn(consumers);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
+ consumers.add(notificationConsumerMock);
+
+ when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers);
+ final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ notificationHookConsumer.consumers.get(0).start();
+ Thread.sleep(500);
+ return null;
+ }
+ }).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
+
+ notificationHookConsumer.startInternal(configuration, executorService);
+ notificationHookConsumer.instanceIsPassive();
+ verify(notificationInterface).close();
+ verify(executorService).shutdown();
+ verify(notificationConsumerMock).wakeup();
+ }
+
+ @Test
+ public void consumersStoppedBeforeStarting() throws Exception {
+ when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
+ when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
+ when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
+ List<NotificationConsumer<Object>> consumers = new ArrayList();
+ NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
+ consumers.add(notificationConsumerMock);
+
+ when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers);
+ final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+
notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsPassive();
verify(notificationInterface).close();