You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/07/24 15:57:14 UTC
[09/39] atlas git commit: ATLAS-1944: Implemented ShutdownableThread
for HookConsumer
ATLAS-1944: Implemented ShutdownableThread for HookConsumer
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/18745cf4
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/18745cf4
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/18745cf4
Branch: refs/heads/feature-odf
Commit: 18745cf4b98af9c45e853daa280342dde8da1300
Parents: b0470f5
Author: ashutoshm <am...@hortonworks.com>
Authored: Wed Jul 12 14:43:45 2017 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Thu Jul 13 13:25:15 2017 -0700
----------------------------------------------------------------------
.../notification/NotificationHookConsumer.java | 56 +++++++++++---------
1 file changed, 31 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/18745cf4/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 9e5b864..0dea0e2 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -19,6 +19,7 @@ package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import kafka.utils.ShutdownableThread;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
@@ -28,7 +29,11 @@ import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest;
+import org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest;
import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
+import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
+import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
@@ -41,11 +46,12 @@ import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.service.ServiceState;
import org.apache.atlas.web.util.DateTimeHelper;
import org.apache.commons.configuration.Configuration;
+import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
-import org.apache.kafka.common.TopicPartition;
+
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Date;
@@ -56,14 +62,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import static org.apache.atlas.AtlasClientV2.CREATE_ENTITY;
-import static org.apache.atlas.AtlasClientV2.DELETE_ENTITY_BY_ATTRIBUTE;
-import static org.apache.atlas.AtlasClientV2.UPDATE_ENTITY;
-import static org.apache.atlas.AtlasClientV2.UPDATE_ENTITY_BY_ATTRIBUTE;
-import static org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest;
-import static org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest;
-import static org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
-import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import static org.apache.atlas.AtlasClientV2.*;
/**
* Consumer of notifications from hooks e.g., hive hook etc.
@@ -80,7 +79,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize";
- public static final String CONSUMER_RETRY_INTERVAL="atlas.notification.consumer.retry.interval";
+ public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval";
public static final int SERVER_READY_WAIT_TIME_MS = 1000;
private final AtlasEntityStore atlasEntityStore;
@@ -177,7 +176,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
/**
* Start Kafka consumer threads that read from Kafka topic when server is activated.
- *
+ * <p>
* Since the consumers create / update entities to the shared backend store, only the active instance
* should perform this activity. Hence, these threads are started only on server activation.
*/
@@ -189,7 +188,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
/**
* Stop Kafka consumer threads that read from Kafka topic when server is de-activated.
- *
+ * <p>
* Since the consumers create / update entities to the shared backend store, only the active instance
* should perform this activity. Hence, these threads are stopped only on server deactivation.
*/
@@ -205,18 +204,18 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
- class HookConsumer implements Runnable {
+ class HookConsumer extends ShutdownableThread {
private final NotificationConsumer<HookNotificationMessage> consumer;
private final AtomicBoolean shouldRun = new AtomicBoolean(false);
private List<HookNotificationMessage> failedMessages = new ArrayList<>();
public HookConsumer(NotificationConsumer<HookNotificationMessage> consumer) {
+ super("atlas-hook-consumer-thread", false);
this.consumer = consumer;
}
-
@Override
- public void run() {
+ public void doWork() {
shouldRun.set(true);
if (!serverAvailable(new NotificationHookConsumer.Timer())) {
@@ -226,7 +225,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
while (shouldRun.get()) {
try {
List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(1000L);
- for (AtlasKafkaMessage<HookNotificationMessage> msg : messages){
+ for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
handleMessage(msg);
}
} catch (Throwable t) {
@@ -267,15 +266,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
- String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), partialUpdateRequest.getTypeName()));
+ String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), partialUpdateRequest.getTypeName()));
}
Referenceable referenceable = partialUpdateRequest.getEntity();
entities = instanceConverter.toAtlasEntity(referenceable);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
- String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>(){
- { put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue()); }
+ String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>() {
+ {
+ put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue());
+ }
});
// There should only be one root entity
@@ -289,13 +290,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
- String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), deleteRequest.getTypeName()));
+ String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), deleteRequest.getTypeName()));
}
try {
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
atlasEntityStore.deleteByUniqueAttributes(type,
- new HashMap<String, Object>() {{ put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue()); }});
+ new HashMap<String, Object>() {{
+ put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue());
+ }});
} catch (ClassCastException cle) {
LOG.error("Failed to do a partial update on Entity");
}
@@ -319,10 +322,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
break;
} catch (Throwable e) {
LOG.warn("Error handling message", e);
- try{
+ try {
LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
Thread.sleep(consumerRetryInterval);
- }catch (InterruptedException ie){
+ } catch (InterruptedException ie) {
LOG.error("Notification consumer thread sleep interrupted");
}
@@ -379,9 +382,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
return true;
}
- public void stop() {
+ @Override
+ public void shutdown() {
+ super.initiateShutdown();
shouldRun.set(false);
consumer.close();
+ super.awaitShutdown();
}
}
@@ -393,4 +399,4 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST,
DateTimeHelper.formatDateUTC(new Date()));
}
-}
+}
\ No newline at end of file