You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/02/20 04:45:25 UTC

incubator-atlas git commit: ATLAS-1569: cleared contents of RequestContextV1 at the end of the request

Repository: incubator-atlas
Updated Branches:
  refs/heads/master 343d0b1f6 -> bb1c386a7


ATLAS-1569: cleared contents of RequestContextV1 at the end of the request


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/bb1c386a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/bb1c386a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/bb1c386a

Branch: refs/heads/master
Commit: bb1c386a7c6c2d649f09b103105712a78ec32507
Parents: 343d0b1
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Sat Feb 18 23:01:13 2017 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Sun Feb 19 20:43:40 2017 -0800

----------------------------------------------------------------------
 .../org/apache/atlas/AtlasEntitiesClientV2.java |  8 +--
 .../notification/NotificationHookConsumer.java  | 56 ++++++++++++--------
 .../apache/atlas/web/filters/AuditFilter.java   |  3 ++
 3 files changed, 40 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bb1c386a/client/src/main/java/org/apache/atlas/AtlasEntitiesClientV2.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasEntitiesClientV2.java b/client/src/main/java/org/apache/atlas/AtlasEntitiesClientV2.java
index 9ad9c16..b16bb58 100644
--- a/client/src/main/java/org/apache/atlas/AtlasEntitiesClientV2.java
+++ b/client/src/main/java/org/apache/atlas/AtlasEntitiesClientV2.java
@@ -43,11 +43,11 @@ public class AtlasEntitiesClientV2 extends AtlasBaseClient {
 
     private static final APIInfo GET_ENTITY_BY_GUID         = new APIInfo(ENTITY_API + "guid/%s", HttpMethod.GET, Response.Status.OK);
     private static final APIInfo GET_ENTITY_BY_ATTRIBUTE    = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.GET, Response.Status.OK);
-    private static final APIInfo CREATE_ENTITY              = new APIInfo(ENTITY_API, HttpMethod.POST, Response.Status.OK);
-    private static final APIInfo UPDATE_ENTITY              = CREATE_ENTITY;
-    private static final APIInfo UPDATE_ENTITY_BY_ATTRIBUTE = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.PUT, Response.Status.OK);
+    public  static final APIInfo CREATE_ENTITY              = new APIInfo(ENTITY_API, HttpMethod.POST, Response.Status.OK);
+    public  static final APIInfo UPDATE_ENTITY              = CREATE_ENTITY;
+    public  static final APIInfo UPDATE_ENTITY_BY_ATTRIBUTE = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.PUT, Response.Status.OK);
     private static final APIInfo DELETE_ENTITY_BY_GUID      = new APIInfo(ENTITY_API + "guid/%s", HttpMethod.DELETE, Response.Status.OK);
-    private static final APIInfo DELETE_ENTITY_BY_ATTRIBUTE = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.DELETE, Response.Status.OK);
+    public  static final APIInfo DELETE_ENTITY_BY_ATTRIBUTE = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.DELETE, Response.Status.OK);
 
     private static final APIInfo GET_ENTITIES_BY_GUIDS    = new APIInfo(ENTITY_API + "bulk/", HttpMethod.GET, Response.Status.OK);
     private static final APIInfo CREATE_ENTITIES          = new APIInfo(ENTITY_API + "bulk/", HttpMethod.POST, Response.Status.OK);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bb1c386a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index c16fd66..44c7995 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -22,9 +22,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Singleton;
 import kafka.consumer.ConsumerTimeoutException;
 import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
 import org.apache.atlas.model.instance.AtlasEntity;
@@ -55,6 +55,10 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.atlas.AtlasEntitiesClientV2.CREATE_ENTITY;
+import static org.apache.atlas.AtlasEntitiesClientV2.DELETE_ENTITY_BY_ATTRIBUTE;
+import static org.apache.atlas.AtlasEntitiesClientV2.UPDATE_ENTITY;
+import static org.apache.atlas.AtlasEntitiesClientV2.UPDATE_ENTITY_BY_ATTRIBUTE;
 import static org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest;
 import static org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest;
 import static org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
@@ -242,16 +246,18 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             AtlasEntity.AtlasEntitiesWithExtInfo entities;
             for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Running attempt {}", numRetries);
+                    LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries);
                 }
                 try {
+                    RequestContextV1.get().setUser(messageUser);
+
                     switch (message.getType()) {
                         case ENTITY_CREATE:
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("EntityCreate via hook");
-                            }
                             EntityCreateRequest createRequest = (EntityCreateRequest) message;
-                            audit(messageUser, AtlasClient.API.CREATE_ENTITY);
+
+                            if (numRetries == 0) { // audit only on the first attempt
+                                audit(messageUser, CREATE_ENTITY.getMethod(), CREATE_ENTITY.getPath());
+                            }
 
                             entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
 
@@ -259,11 +265,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                             break;
 
                         case ENTITY_PARTIAL_UPDATE:
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("EntityPartialUpdate via hook");
-                            }
                             final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message;
-                            audit(messageUser, AtlasClient.API.UPDATE_ENTITY_PARTIAL);
+
+                            if (numRetries == 0) { // audit only on the first attempt
+                                audit(messageUser, UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
+                                      String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), partialUpdateRequest.getTypeName()));
+                            }
 
                             Referenceable referenceable = partialUpdateRequest.getEntity();
                             entities = instanceConverter.toAtlasEntity(referenceable);
@@ -280,11 +287,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                             break;
 
                         case ENTITY_DELETE:
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("EntityDelete via hook");
-                            }
                             final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message;
-                            audit(messageUser, AtlasClient.API.DELETE_ENTITY);
+
+                            if (numRetries == 0) { // audit only on the first attempt
+                                audit(messageUser, DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
+                                      String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), deleteRequest.getTypeName()));
+                            }
 
                             try {
                                 AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
@@ -296,23 +304,23 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                             break;
 
                         case ENTITY_FULL_UPDATE:
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("EntityFullUpdate via hook");
-                            }
                             EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
-                            audit(messageUser, AtlasClient.API.UPDATE_ENTITY);
+
+                            if (numRetries == 0) { // audit only on the first attempt
+                                audit(messageUser, UPDATE_ENTITY.getMethod(), UPDATE_ENTITY.getPath());
+                            }
 
                             entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
                             atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
                             break;
 
                         default:
-                            throw new IllegalStateException("Unhandled exception!");
+                            throw new IllegalStateException("Unknown notification type: " + message.getType().name());
                     }
 
                     break;
                 } catch (Throwable e) {
-                    LOG.warn("Error handling message: {}", e.getMessage());
+                    LOG.warn("Error handling message", e);
                     try{
                         LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
                         Thread.sleep(consumerRetryInterval);
@@ -328,6 +336,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                         }
                         return;
                     }
+                } finally {
+                    RequestContextV1.clear();
                 }
             }
             commit();
@@ -375,12 +385,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         }
     }
 
-    private void audit(String messageUser, AtlasClient.API api) {
+    private void audit(String messageUser, String method, String path) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("==> audit({},{})", messageUser, api);
+            LOG.debug("==> audit({},{}, {})", messageUser, method, path);
         }
 
-        AuditFilter.audit(messageUser, THREADNAME_PREFIX, api.getMethod(), LOCALHOST, api.getPath(), LOCALHOST,
+        AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST,
                 DateTimeHelper.formatDateUTC(new Date()));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bb1c386a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
index 030788a..d804f21 100755
--- a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
@@ -23,6 +23,7 @@ import com.google.inject.Singleton;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.RequestContext;
+import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.metrics.Metrics;
 import org.apache.commons.configuration.Configuration;
 import org.apache.atlas.util.AtlasRepositoryConfiguration;
@@ -73,6 +74,7 @@ public class AuditFilter implements Filter {
             currentThread.setName(formatName(oldName, requestId));
             RequestContext requestContext = RequestContext.createContext();
             requestContext.setUser(user);
+            RequestContextV1.get().setUser(user);
             recordAudit(httpRequest, requestTimeISO9601, user);
             filterChain.doFilter(request, response);
         } finally {
@@ -81,6 +83,7 @@ public class AuditFilter implements Filter {
             currentThread.setName(oldName);
             recordMetrics();
             RequestContext.clear();
+            RequestContextV1.clear();
         }
     }