You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/02/20 04:45:25 UTC
incubator-atlas git commit: ATLAS-1569: cleared contents of
RequestContextV1 at the end of the request
Repository: incubator-atlas
Updated Branches:
refs/heads/master 343d0b1f6 -> bb1c386a7
ATLAS-1569: cleared contents of RequestContextV1 at the end of the request
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/bb1c386a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/bb1c386a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/bb1c386a
Branch: refs/heads/master
Commit: bb1c386a7c6c2d649f09b103105712a78ec32507
Parents: 343d0b1
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Sat Feb 18 23:01:13 2017 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Sun Feb 19 20:43:40 2017 -0800
----------------------------------------------------------------------
.../org/apache/atlas/AtlasEntitiesClientV2.java | 8 +--
.../notification/NotificationHookConsumer.java | 56 ++++++++++++--------
.../apache/atlas/web/filters/AuditFilter.java | 3 ++
3 files changed, 40 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bb1c386a/client/src/main/java/org/apache/atlas/AtlasEntitiesClientV2.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasEntitiesClientV2.java b/client/src/main/java/org/apache/atlas/AtlasEntitiesClientV2.java
index 9ad9c16..b16bb58 100644
--- a/client/src/main/java/org/apache/atlas/AtlasEntitiesClientV2.java
+++ b/client/src/main/java/org/apache/atlas/AtlasEntitiesClientV2.java
@@ -43,11 +43,11 @@ public class AtlasEntitiesClientV2 extends AtlasBaseClient {
private static final APIInfo GET_ENTITY_BY_GUID = new APIInfo(ENTITY_API + "guid/%s", HttpMethod.GET, Response.Status.OK);
private static final APIInfo GET_ENTITY_BY_ATTRIBUTE = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.GET, Response.Status.OK);
- private static final APIInfo CREATE_ENTITY = new APIInfo(ENTITY_API, HttpMethod.POST, Response.Status.OK);
- private static final APIInfo UPDATE_ENTITY = CREATE_ENTITY;
- private static final APIInfo UPDATE_ENTITY_BY_ATTRIBUTE = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.PUT, Response.Status.OK);
+ public static final APIInfo CREATE_ENTITY = new APIInfo(ENTITY_API, HttpMethod.POST, Response.Status.OK);
+ public static final APIInfo UPDATE_ENTITY = CREATE_ENTITY;
+ public static final APIInfo UPDATE_ENTITY_BY_ATTRIBUTE = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.PUT, Response.Status.OK);
private static final APIInfo DELETE_ENTITY_BY_GUID = new APIInfo(ENTITY_API + "guid/%s", HttpMethod.DELETE, Response.Status.OK);
- private static final APIInfo DELETE_ENTITY_BY_ATTRIBUTE = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.DELETE, Response.Status.OK);
+ public static final APIInfo DELETE_ENTITY_BY_ATTRIBUTE = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.DELETE, Response.Status.OK);
private static final APIInfo GET_ENTITIES_BY_GUIDS = new APIInfo(ENTITY_API + "bulk/", HttpMethod.GET, Response.Status.OK);
private static final APIInfo CREATE_ENTITIES = new APIInfo(ENTITY_API + "bulk/", HttpMethod.POST, Response.Status.OK);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bb1c386a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index c16fd66..44c7995 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -22,9 +22,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Singleton;
import kafka.consumer.ConsumerTimeoutException;
import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.RequestContextV1;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity;
@@ -55,6 +55,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.apache.atlas.AtlasEntitiesClientV2.CREATE_ENTITY;
+import static org.apache.atlas.AtlasEntitiesClientV2.DELETE_ENTITY_BY_ATTRIBUTE;
+import static org.apache.atlas.AtlasEntitiesClientV2.UPDATE_ENTITY;
+import static org.apache.atlas.AtlasEntitiesClientV2.UPDATE_ENTITY_BY_ATTRIBUTE;
import static org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest;
import static org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest;
import static org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
@@ -242,16 +246,18 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasEntity.AtlasEntitiesWithExtInfo entities;
for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Running attempt {}", numRetries);
+ LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries);
}
try {
+ RequestContextV1.get().setUser(messageUser);
+
switch (message.getType()) {
case ENTITY_CREATE:
- if (LOG.isDebugEnabled()) {
- LOG.debug("EntityCreate via hook");
- }
EntityCreateRequest createRequest = (EntityCreateRequest) message;
- audit(messageUser, AtlasClient.API.CREATE_ENTITY);
+
+ if (numRetries == 0) { // audit only on the first attempt
+ audit(messageUser, CREATE_ENTITY.getMethod(), CREATE_ENTITY.getPath());
+ }
entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
@@ -259,11 +265,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
break;
case ENTITY_PARTIAL_UPDATE:
- if (LOG.isDebugEnabled()) {
- LOG.debug("EntityPartialUpdate via hook");
- }
final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message;
- audit(messageUser, AtlasClient.API.UPDATE_ENTITY_PARTIAL);
+
+ if (numRetries == 0) { // audit only on the first attempt
+ audit(messageUser, UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
+ String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), partialUpdateRequest.getTypeName()));
+ }
Referenceable referenceable = partialUpdateRequest.getEntity();
entities = instanceConverter.toAtlasEntity(referenceable);
@@ -280,11 +287,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
break;
case ENTITY_DELETE:
- if (LOG.isDebugEnabled()) {
- LOG.debug("EntityDelete via hook");
- }
final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message;
- audit(messageUser, AtlasClient.API.DELETE_ENTITY);
+
+ if (numRetries == 0) { // audit only on the first attempt
+ audit(messageUser, DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
+ String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), deleteRequest.getTypeName()));
+ }
try {
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
@@ -296,23 +304,23 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
break;
case ENTITY_FULL_UPDATE:
- if (LOG.isDebugEnabled()) {
- LOG.debug("EntityFullUpdate via hook");
- }
EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
- audit(messageUser, AtlasClient.API.UPDATE_ENTITY);
+
+ if (numRetries == 0) { // audit only on the first attempt
+ audit(messageUser, UPDATE_ENTITY.getMethod(), UPDATE_ENTITY.getPath());
+ }
entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
break;
default:
- throw new IllegalStateException("Unhandled exception!");
+ throw new IllegalStateException("Unknown notification type: " + message.getType().name());
}
break;
} catch (Throwable e) {
- LOG.warn("Error handling message: {}", e.getMessage());
+ LOG.warn("Error handling message", e);
try{
LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
Thread.sleep(consumerRetryInterval);
@@ -328,6 +336,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
return;
}
+ } finally {
+ RequestContextV1.clear();
}
}
commit();
@@ -375,12 +385,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
- private void audit(String messageUser, AtlasClient.API api) {
+ private void audit(String messageUser, String method, String path) {
if (LOG.isDebugEnabled()) {
- LOG.debug("==> audit({},{})", messageUser, api);
+ LOG.debug("==> audit({},{}, {})", messageUser, method, path);
}
- AuditFilter.audit(messageUser, THREADNAME_PREFIX, api.getMethod(), LOCALHOST, api.getPath(), LOCALHOST,
+ AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST,
DateTimeHelper.formatDateUTC(new Date()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bb1c386a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
index 030788a..d804f21 100755
--- a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
@@ -23,6 +23,7 @@ import com.google.inject.Singleton;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
+import org.apache.atlas.RequestContextV1;
import org.apache.atlas.metrics.Metrics;
import org.apache.commons.configuration.Configuration;
import org.apache.atlas.util.AtlasRepositoryConfiguration;
@@ -73,6 +74,7 @@ public class AuditFilter implements Filter {
currentThread.setName(formatName(oldName, requestId));
RequestContext requestContext = RequestContext.createContext();
requestContext.setUser(user);
+ RequestContextV1.get().setUser(user);
recordAudit(httpRequest, requestTimeISO9601, user);
filterChain.doFilter(request, response);
} finally {
@@ -81,6 +83,7 @@ public class AuditFilter implements Filter {
currentThread.setName(oldName);
recordMetrics();
RequestContext.clear();
+ RequestContextV1.clear();
}
}