You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2018/12/18 23:06:24 UTC

atlas git commit: ATLAS-3002: added instrumentation to collect time taken for sub-tasks during entity create/update

Repository: atlas
Updated Branches:
  refs/heads/master bd0c5a8a8 -> beb34506a


ATLAS-3002: added instrumentation to collect time taken for sub-tasks during entity create/update


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/beb34506
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/beb34506
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/beb34506

Branch: refs/heads/master
Commit: beb34506a15379af4a306902da049f37b445a2f5
Parents: bd0c5a8
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Tue Dec 18 09:17:29 2018 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Tue Dec 18 15:02:13 2018 -0800

----------------------------------------------------------------------
 .../hive/hook/events/CreateHiveProcess.java     |   2 +-
 .../apache/atlas/utils/AtlasPerfMetrics.java    | 121 +++++++++++++++++++
 distro/src/conf/atlas-log4j.xml                 |  19 ++-
 .../apache/atlas/kafka/KafkaNotification.java   |   3 +
 .../atlas/GraphTransactionInterceptor.java      |   6 +
 .../repository/audit/EntityAuditListener.java   |  33 +++++
 .../repository/audit/EntityAuditListenerV2.java |  34 +++++-
 .../repository/graph/FullTextMapperV2.java      |  86 +++++++++----
 .../graph/v2/AtlasEntityChangeNotifier.java     |  10 +-
 .../graph/v2/AtlasEntityGraphDiscoveryV2.java   |  10 ++
 .../store/graph/v2/AtlasEntityStoreV2.java      |   9 ++
 .../store/graph/v2/AtlasGraphUtilsV2.java       |  14 +++
 .../store/graph/v2/EntityGraphMapper.java       |  13 ++
 .../java/org/apache/atlas/RequestContext.java   |  22 +++-
 .../EntityNotificationListenerV2.java           |   5 +
 .../NotificationEntityChangeListener.java       |   6 +
 .../notification/NotificationHookConsumer.java  |  12 ++
 17 files changed, 378 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
index 2ccfff4..d61f1d7 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
@@ -185,7 +185,7 @@ public class CreateHiveProcess extends BaseHiveEvent {
 
             AtlasEntity columnLineageProcess = new AtlasEntity(HIVE_TYPE_COLUMN_LINEAGE);
 
-            columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
+            columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
             columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
             columnLineageProcess.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputColumns));
             columnLineageProcess.setAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getObjectId(outputColumn)));

http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java b/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java
new file mode 100644
index 0000000..c72b2c3
--- /dev/null
+++ b/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.utils;
+
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class AtlasPerfMetrics {
+    private final Map<String, Metric> metrics = new LinkedHashMap<>();
+
+
+    public MetricRecorder getMetricRecorder(String name) {
+        return new MetricRecorder(name);
+    }
+
+    public void recordMetric(MetricRecorder recorder) {
+        if (recorder != null) {
+            final String name      = recorder.name;
+            final long   timeTaken = recorder.getElapsedTime();
+
+            Metric metric = metrics.get(name);
+
+            if (metric == null) {
+                metric = new Metric(name);
+
+                metrics.put(name, metric);
+            }
+
+            metric.invocations++;
+            metric.totalTimeMSecs += timeTaken;
+        }
+    }
+
+    public void clear() {
+        metrics.clear();
+    }
+
+    public boolean isEmpty() {
+        return metrics.isEmpty();
+    }
+
+    public Set<String> getMetricsNames() {
+        return metrics.keySet();
+    }
+
+    public Metric getMetric(String name) {
+        return metrics.get(name);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append("{");
+
+        if (!metrics.isEmpty()) {
+            for (Metric metric : metrics.values()) {
+                sb.append("\"").append(metric.getName()).append("\":{\"count\":").append(metric.getInvocations()).append(",\"timeTaken\":").append(metric.getTotalTimeMSecs()).append("},");
+            }
+
+            sb.setLength(sb.length() - 1); // remove last ","
+        }
+
+        sb.append("}");
+
+        return sb.toString();
+    }
+
+    public class MetricRecorder {
+        private final String name;
+        private final long   startTimeMs = System.currentTimeMillis();
+
+        MetricRecorder(String name) {
+            this.name = name;
+        }
+
+        long getElapsedTime() {
+            return System.currentTimeMillis() - startTimeMs;
+        }
+    }
+
+    public static class Metric {
+        private final String name;
+        private       short  invocations    = 0;
+        private       long   totalTimeMSecs = 0;
+
+        public Metric(String name) {
+            this.name = name;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public short getInvocations() {
+            return invocations;
+        }
+
+        public long getTotalTimeMSecs() {
+            return totalTimeMSecs;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/distro/src/conf/atlas-log4j.xml
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-log4j.xml b/distro/src/conf/atlas-log4j.xml
index c183871..9b4fa04 100755
--- a/distro/src/conf/atlas-log4j.xml
+++ b/distro/src/conf/atlas-log4j.xml
@@ -37,6 +37,16 @@
         </layout>
     </appender>
 
+    <appender name="LARGE_MESSAGES" class="org.apache.log4j.RollingFileAppender">
+        <param name="File" value="{{log_dir}}/large_messages.log"/>
+        <param name="Append" value="true"/>
+        <param name="MaxFileSize" value="100MB" />
+        <param name="MaxBackupIndex" value="20" />
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %m%n"/>
+        </layout>
+    </appender>
+
     <appender name="AUDIT" class="org.apache.log4j.RollingFileAppender">
         <param name="File" value="${atlas.log.dir}/audit.log"/>
         <param name="Append" value="true"/>
@@ -60,7 +70,7 @@
         <param name="File" value="${atlas.log.dir}/failed.log"/>
         <param name="Append" value="true"/>
         <layout class="org.apache.log4j.PatternLayout">
-            <param name="ConversionPattern" value="%d %m"/>
+            <param name="ConversionPattern" value="%d %m%n"/>
             <param name="maxFileSize" value="100MB" />
             <param name="maxBackupIndex" value="20" />
         </layout>
@@ -119,6 +129,11 @@
         <appender-ref ref="AUDIT"/>
     </logger>
 
+    <logger name="LARGE_MESSAGES" additivity="false">
+        <level value="warn"/>
+        <appender-ref ref="LARGE_MESSAGES"/>
+    </logger>
+
     <logger name="METRICS" additivity="false">
         <level value="debug"/>
         <appender-ref ref="METRICS"/>
@@ -126,7 +141,7 @@
 
     <logger name="FAILED" additivity="false">
         <level value="info"/>
-        <appender-ref ref="AUDIT"/>
+        <appender-ref ref="FAILED"/>
     </logger>
 
     <root>

http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 4bec917..1d0a273 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -106,6 +106,9 @@ public class KafkaNotification extends AbstractNotification implements Service {
         properties.put("enable.auto.commit", kafkaConf.getBoolean("enable.auto.commit", oldApiCommitEnableFlag));
         properties.put("session.timeout.ms", kafkaConf.getString("session.timeout.ms", "30000"));
 
+        // if no value is specified for max.poll.records, set to 1
+        properties.put("max.poll.records", kafkaConf.getInt("max.poll.records", 1));
+
         LOG.info("<== KafkaNotification()");
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
index 4c43677..cbd2226 100644
--- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
+++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
@@ -23,6 +23,7 @@ import org.aopalliance.intercept.MethodInvocation;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.exception.NotFoundException;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -69,6 +70,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
         }
 
         boolean isSuccess = false;
+        MetricRecorder metric = null;
 
         try {
             try {
@@ -79,6 +81,8 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
                         LOG.debug("Ignoring commit for nested/inner transaction {}.{}", invokingClass, invokedMethodName);
                     }
                 } else {
+                    metric = RequestContext.get().startMetricRecord("graphCommit");
+
                     doCommitOrRollback(invokingClass, invokedMethodName);
                 }
 
@@ -97,6 +101,8 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
                 throw t;
             }
         } finally {
+            RequestContext.get().endMetricRecord(metric);
+
             // Only outer txn can mark as closed
             if (!isInnerTxn) {
                 if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
index dfacb38..69d373d 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
@@ -24,6 +24,7 @@ import org.apache.atlas.EntityAuditEvent.EntityAuditAction;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.listener.EntityChangeListener;
 import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.v1.model.instance.Struct;
 import org.apache.atlas.type.AtlasEntityType;
@@ -65,6 +66,8 @@ public class EntityAuditListener implements EntityChangeListener {
 
     @Override
     public void onEntitiesAdded(Collection<Referenceable> entities, boolean isImport) throws AtlasException {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
         List<EntityAuditEvent> events = new ArrayList<>();
         for (Referenceable entity : entities) {
             EntityAuditEvent event = createEvent(entity, isImport ? EntityAuditAction.ENTITY_IMPORT_CREATE : EntityAuditAction.ENTITY_CREATE);
@@ -72,10 +75,14 @@ public class EntityAuditListener implements EntityChangeListener {
         }
 
         auditRepository.putEventsV1(events);
+
+        RequestContext.get().endMetricRecord(metric);
     }
 
     @Override
     public void onEntitiesUpdated(Collection<Referenceable> entities, boolean isImport) throws AtlasException {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
         List<EntityAuditEvent> events = new ArrayList<>();
         for (Referenceable entity : entities) {
             EntityAuditEvent event = createEvent(entity, isImport ? EntityAuditAction.ENTITY_IMPORT_UPDATE : EntityAuditAction.ENTITY_UPDATE);
@@ -83,45 +90,61 @@ public class EntityAuditListener implements EntityChangeListener {
         }
 
         auditRepository.putEventsV1(events);
+
+        RequestContext.get().endMetricRecord(metric);
     }
 
     @Override
     public void onTraitsAdded(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException {
         if (traits != null) {
+            MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
             for (Struct trait : traits) {
                 EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_ADD,
                                                      "Added trait: " + AtlasType.toV1Json(trait));
 
                 auditRepository.putEventsV1(event);
             }
+
+            RequestContext.get().endMetricRecord(metric);
         }
     }
 
     @Override
     public void onTraitsDeleted(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException {
         if (traits != null) {
+            MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
             for (Struct trait : traits) {
                 EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_DELETE, "Deleted trait: " + trait.getTypeName());
 
                 auditRepository.putEventsV1(event);
             }
+
+            RequestContext.get().endMetricRecord(metric);
         }
     }
 
     @Override
     public void onTraitsUpdated(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException {
         if (traits != null) {
+            MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
             for (Struct trait : traits) {
                 EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_UPDATE,
                                                      "Updated trait: " + AtlasType.toV1Json(trait));
 
                 auditRepository.putEventsV1(event);
             }
+
+            RequestContext.get().endMetricRecord(metric);
         }
     }
 
     @Override
     public void onEntitiesDeleted(Collection<Referenceable> entities, boolean isImport) throws AtlasException {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
         List<EntityAuditEvent> events = new ArrayList<>();
         for (Referenceable entity : entities) {
             EntityAuditEvent event = createEvent(entity, isImport ? EntityAuditAction.ENTITY_IMPORT_DELETE : EntityAuditAction.ENTITY_DELETE, "Deleted entity");
@@ -129,10 +152,14 @@ public class EntityAuditListener implements EntityChangeListener {
         }
 
         auditRepository.putEventsV1(events);
+
+        RequestContext.get().endMetricRecord(metric);
     }
 
     @Override
     public void onTermAdded(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
         List<EntityAuditEvent> events = new ArrayList<>();
 
         for (Referenceable entity : entities) {
@@ -140,10 +167,14 @@ public class EntityAuditListener implements EntityChangeListener {
         }
 
         auditRepository.putEventsV1(events);
+
+        RequestContext.get().endMetricRecord(metric);
     }
 
     @Override
     public void onTermDeleted(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
         List<EntityAuditEvent> events = new ArrayList<>();
 
         for (Referenceable entity : entities) {
@@ -151,6 +182,8 @@ public class EntityAuditListener implements EntityChangeListener {
         }
 
         auditRepository.putEventsV1(events);
+
+        RequestContext.get().endMetricRecord(metric);
     }
 
     public List<EntityAuditEvent> getAuditEvents(String guid) throws AtlasException{

http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
index 8ca8c9a..16d8879 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
@@ -26,13 +26,13 @@ import org.apache.atlas.listener.EntityChangeListenerV2;
 import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -79,6 +79,8 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
 
     @Override
     public void onEntitiesAdded(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
         List<EntityAuditEventV2> events = new ArrayList<>();
 
         for (AtlasEntity entity : entities) {
@@ -88,10 +90,14 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
         }
 
         auditRepository.putEventsV2(events);
+
+        RequestContext.get().endMetricRecord(metric);
     }
 
     @Override
     public void onEntitiesUpdated(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
         List<EntityAuditEventV2> events = new ArrayList<>();
 
         for (AtlasEntity entity : entities) {
@@ -101,10 +107,14 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
         }
 
         auditRepository.putEventsV2(events);
+
+        RequestContext.get().endMetricRecord(metric);
     }
 
     @Override
     public void onEntitiesDeleted(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
         List<EntityAuditEventV2> events = new ArrayList<>();
 
         for (AtlasEntity entity : entities) {
@@ -114,11 +124,15 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
         }
 
         auditRepository.putEventsV2(events);
+
+        RequestContext.get().endMetricRecord(metric);
     }
 
     @Override
     public void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
         if (CollectionUtils.isNotEmpty(classifications)) {
+            MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
             List<EntityAuditEventV2> events = new ArrayList<>();
 
             for (AtlasClassification classification : classifications) {
@@ -130,12 +144,16 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
             }
 
             auditRepository.putEventsV2(events);
+
+            RequestContext.get().endMetricRecord(metric);
         }
     }
 
     @Override
     public void onClassificationsUpdated(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
         if (CollectionUtils.isNotEmpty(classifications)) {
+            MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
             List<EntityAuditEventV2> events = new ArrayList<>();
             String                   guid   = entity.getGuid();
 
@@ -154,12 +172,16 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
             }
 
             auditRepository.putEventsV2(events);
+
+            RequestContext.get().endMetricRecord(metric);
         }
     }
 
     @Override
     public void onClassificationsDeleted(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
         if (CollectionUtils.isNotEmpty(classifications)) {
+            MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
             List<EntityAuditEventV2> events = new ArrayList<>();
 
             for (AtlasClassification classification : classifications) {
@@ -171,12 +193,16 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
             }
 
             auditRepository.putEventsV2(events);
+
+            RequestContext.get().endMetricRecord(metric);
         }
     }
 
     @Override
     public void onTermAdded(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException {
         if (term != null && CollectionUtils.isNotEmpty(entities)) {
+            MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
             List<EntityAuditEventV2> events = new ArrayList<>();
 
             for (AtlasRelatedObjectId relatedObjectId : entities) {
@@ -188,12 +214,16 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
             }
 
             auditRepository.putEventsV2(events);
+
+            RequestContext.get().endMetricRecord(metric);
         }
     }
 
     @Override
     public void onTermDeleted(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException {
         if (term != null && CollectionUtils.isNotEmpty(entities)) {
+            MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
             List<EntityAuditEventV2> events = new ArrayList<>();
 
             for (AtlasRelatedObjectId relatedObjectId : entities) {
@@ -205,6 +235,8 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
             }
 
             auditRepository.putEventsV2(events);
+
+            RequestContext.get().endMetricRecord(metric);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
index 08ccd9c..77cecff 100644
--- a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
@@ -26,6 +26,13 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasStruct;
 import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasArrayType;
+import org.apache.atlas.type.AtlasBuiltInTypes;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
@@ -37,6 +44,7 @@ import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -52,15 +60,18 @@ public class FullTextMapperV2 {
     private static final String FULL_TEXT_FOLLOW_REFERENCES          = "atlas.search.fulltext.followReferences";
     private static final String FULL_TEXT_EXCLUDE_ATTRIBUTE_PROPERTY = "atlas.search.fulltext.type";
 
-    private final EntityGraphRetriever     entityGraphRetriever;
+    private final AtlasTypeRegistry        typeRegistry;
     private final Configuration            configuration;
+    private final EntityGraphRetriever     entityGraphRetriever;
     private final boolean                  followReferences;
     private final Map<String, Set<String>> excludeAttributesCache = new HashMap<>();
 
 
     @Inject
     public FullTextMapperV2(AtlasTypeRegistry typeRegistry, Configuration configuration) {
+        this.typeRegistry  = typeRegistry;
         this.configuration = configuration;
+
         followReferences = this.configuration != null && this.configuration.getBoolean(FULL_TEXT_FOLLOW_REFERENCES, false);
         // If followReferences = false then ignore relationship attr loading
         entityGraphRetriever = new EntityGraphRetriever(typeRegistry, !followReferences);
@@ -90,11 +101,12 @@ public class FullTextMapperV2 {
 
             if (CollectionUtils.isNotEmpty(classifications)) {
                 for (AtlasClassification classification : classifications) {
-                    sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER);
+                    final AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classification.getTypeName());
+                    final Set<String>             excludeAttributes  = getExcludeAttributesForIndexText(classification.getTypeName());
 
-                    Set<String> excludeAttributes = getExcludeAttributesForIndexText(classification.getTypeName());
+                    sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER);
 
-                    mapAttributes(classification.getAttributes(), entityWithExtInfo, sb, new HashSet<String>(), excludeAttributes);
+                    mapAttributes(classificationType, classification.getAttributes(), entityWithExtInfo, sb, new HashSet<String>(), excludeAttributes);
                 }
             }
 
@@ -109,13 +121,24 @@ public class FullTextMapperV2 {
     }
 
     public String getIndexTextForEntity(String guid) throws AtlasBaseException {
-        String      ret    = null;
-        AtlasEntity entity = getAndCacheEntity(guid);
+        String                   ret    = null;
+        final AtlasEntity        entity;
+        final AtlasEntityExtInfo entityExtInfo;
+
+        if (followReferences) {
+            AtlasEntityWithExtInfo entityWithExtInfo = getAndCacheEntityWithExtInfo(guid);
+
+            entity        = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
+            entityExtInfo = entityWithExtInfo;
+        } else {
+            entity        = getAndCacheEntity(guid);
+            entityExtInfo = null;
+        }
 
         if (entity != null) {
             StringBuilder sb = new StringBuilder();
 
-            map(entity, null, sb, new HashSet<String>());
+            map(entity, entityExtInfo, sb, new HashSet<String>());
 
             ret = sb.toString();
         }
@@ -132,27 +155,30 @@ public class FullTextMapperV2 {
             return;
         }
 
+        final AtlasEntityType entityType        = typeRegistry.getEntityTypeByName(entity.getTypeName());
+        final Set<String>     excludeAttributes = getExcludeAttributesForIndexText(entity.getTypeName());
+
         processedGuids.add(entity.getGuid());
 
         sb.append(entity.getTypeName()).append(FULL_TEXT_DELIMITER);
 
-        Set<String> excludeAttributes = getExcludeAttributesForIndexText(entity.getTypeName());
+        mapAttributes(entityType, entity.getAttributes(), entityExtInfo, sb, processedGuids, excludeAttributes);
 
-        mapAttributes(entity.getAttributes(), entityExtInfo, sb, processedGuids, excludeAttributes);
-
-        List<AtlasClassification> classifications = entity.getClassifications();
+        final List<AtlasClassification> classifications = entity.getClassifications();
         if (CollectionUtils.isNotEmpty(classifications)) {
             for (AtlasClassification classification : classifications) {
-                sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER);
+                final AtlasClassificationType classificationType              = typeRegistry.getClassificationTypeByName(classification.getTypeName());
+                final Set<String>             excludeClassificationAttributes = getExcludeAttributesForIndexText(classification.getTypeName());
 
-                Set<String> excludeClassificationAttributes = getExcludeAttributesForIndexText(classification.getTypeName());
 
-                mapAttributes(classification.getAttributes(), entityExtInfo, sb, processedGuids, excludeClassificationAttributes);
+                sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER);
+
+                mapAttributes(classificationType, classification.getAttributes(), entityExtInfo, sb, processedGuids, excludeClassificationAttributes);
             }
         }
     }
 
-    private void mapAttributes(Map<String, Object> attributes, AtlasEntityExtInfo entityExtInfo, StringBuilder sb,
+    private void mapAttributes(AtlasStructType structType, Map<String, Object> attributes, AtlasEntityExtInfo entityExtInfo, StringBuilder sb,
                                Set<String> processedGuids, Set<String> excludeAttributes) throws AtlasBaseException {
         if (MapUtils.isEmpty(attributes)) {
             return;
@@ -162,10 +188,28 @@ public class FullTextMapperV2 {
             String attribKey = attributeEntry.getKey();
             Object attrValue = attributeEntry.getValue();
 
-            if (attrValue == null || isExcludedAttribute(excludeAttributes, attribKey)) {
+            if (attrValue == null || excludeAttributes.contains(attribKey)) {
                 continue;
             }
 
+            if (!followReferences) {
+                AtlasAttribute attribute     = structType != null ? structType.getAttribute(attribKey) : null;
+                AtlasType      attributeType = attribute != null ? attribute.getAttributeType() : null;
+
+                if (attributeType == null) {
+                    continue;
+                }
+
+                if (attributeType instanceof AtlasArrayType) {
+                    attributeType = ((AtlasArrayType) attributeType).getElementType();
+                }
+
+                if (attributeType instanceof AtlasEntityType || attributeType instanceof AtlasBuiltInTypes.AtlasObjectIdType) {
+                    continue;
+                }
+            }
+
+
             sb.append(attribKey).append(FULL_TEXT_DELIMITER);
 
             mapAttribute(attrValue, entityExtInfo, sb, processedGuids);
@@ -249,12 +293,8 @@ public class FullTextMapperV2 {
         return entityWithExtInfo;
     }
 
-    private boolean isExcludedAttribute(Set<String> excludeAttributes, String attributeName) {
-        return CollectionUtils.isNotEmpty(excludeAttributes) && excludeAttributes.contains(attributeName);
-    }
-
     private Set<String> getExcludeAttributesForIndexText(String typeName) {
-        Set<String> ret = null;
+        final Set<String> ret;
 
         if (excludeAttributesCache.containsKey(typeName)) {
             ret = excludeAttributesCache.get(typeName);
@@ -265,9 +305,13 @@ public class FullTextMapperV2 {
 
             if (ArrayUtils.isNotEmpty(excludeAttributes)) {
                 ret = new HashSet<>(Arrays.asList(excludeAttributes));
+            } else {
+                ret = Collections.emptySet();
             }
 
             excludeAttributesCache.put(typeName, ret);
+        } else {
+            ret = Collections.emptySet();
         }
 
         return ret;

http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
index a8c3363..ca3179a 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
@@ -29,13 +29,13 @@ import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
 
-import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.v1.model.instance.Struct;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
@@ -437,6 +437,8 @@ public class AtlasEntityChangeNotifier {
             LOG.warn("Unable to determine if FullText is disabled. Proceeding with FullText mapping");
         }
 
+        MetricRecorder metric = RequestContext.get().startMetricRecord("fullTextMapping");
+
         for (AtlasEntityHeader entityHeader : entityHeaders) {
             if(GraphHelper.isInternalType(entityHeader.getTypeName())) {
                 continue;
@@ -457,6 +459,8 @@ public class AtlasEntityChangeNotifier {
                 LOG.error("FullText mapping failed for Vertex[ guid = {} ]", guid, e);
             }
         }
+
+        RequestContext.get().endMetricRecord(metric);
     }
 
     private void updateFullTextMapping(String entityId, List<AtlasClassification> classifications) {
@@ -477,6 +481,8 @@ public class AtlasEntityChangeNotifier {
             return;
         }
 
+        MetricRecorder metric = RequestContext.get().startMetricRecord("fullTextMapping");
+
         try {
             String classificationFullText = fullTextMapperV2.getIndexTextForClassifications(entityId, classifications);
             String existingFullText       = AtlasGraphUtilsV2.getEncodedProperty(atlasVertex, ENTITY_TEXT_PROPERTY_KEY, String.class);
@@ -486,6 +492,8 @@ public class AtlasEntityChangeNotifier {
         } catch (AtlasBaseException e) {
             LOG.error("FullText mapping failed for Vertex[ guid = {} ]", entityId, e);
         }
+
+        RequestContext.get().endMetricRecord(metric);
     }
 
     private void doFullTextMapping(String guid) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
index 6580bee..ddab2bf 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
@@ -18,6 +18,7 @@
 package org.apache.atlas.repository.store.graph.v2;
 
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasEntity;
@@ -35,6 +36,7 @@ import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -127,6 +129,8 @@ public class AtlasEntityGraphDiscoveryV2 implements EntityGraphDiscovery {
 
 
     protected void discover() throws AtlasBaseException {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("walkEntityGraph");
+
         EntityStream entityStream = discoveryContext.getEntityStream();
 
         Set<String> walkedEntities = new HashSet<>();
@@ -162,9 +166,13 @@ public class AtlasEntityGraphDiscoveryV2 implements EntityGraphDiscovery {
                 walkedEntities.add(entity.getGuid());
             }
         }
+
+        RequestContext.get().endMetricRecord(metric);
     }
 
     protected void resolveReferences() throws AtlasBaseException {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("resolveReferences");
+
         EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(typeRegistry),
                                                                   new UniqAttrBasedEntityResolver(typeRegistry)
                                                                 };
@@ -172,6 +180,8 @@ public class AtlasEntityGraphDiscoveryV2 implements EntityGraphDiscovery {
         for (EntityResolver resolver : entityResolvers) {
             resolver.resolveEntityReferences(discoveryContext);
         }
+
+        RequestContext.get().endMetricRecord(metric);
     }
 
     private void visitReference(AtlasObjectIdType type, Object val) throws AtlasBaseException {

http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index d97b74d..35aa3af 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -41,6 +41,7 @@ import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.atlas.utils.AtlasEntityUtil;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
 import org.apache.atlas.utils.AtlasPerfTracer;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
@@ -671,6 +672,8 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
             perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "createOrUpdate()");
         }
 
+        MetricRecorder metric = RequestContext.get().startMetricRecord("createOrUpdate");
+
         try {
             final EntityMutationContext context = preCreateOrUpdate(entityStream, entityGraphMapper, isPartialUpdate);
 
@@ -730,11 +733,15 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
 
             return ret;
         } finally {
+            RequestContext.get().endMetricRecord(metric);
+
             AtlasPerfTracer.log(perf);
         }
     }
 
     private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("preCreateOrUpdate");
+
         EntityGraphDiscovery        graphDiscoverer  = new AtlasEntityGraphDiscoveryV2(typeRegistry, entityStream);
         EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities();
         EntityMutationContext       context          = new EntityMutationContext(discoveryContext);
@@ -797,6 +804,8 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
             }
         }
 
+        RequestContext.get().endMetricRecord(metric);
+
         return context;
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
index 25770a3..69d3f63 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.store.graph.v2;
 
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.RequestContext;
 import org.apache.atlas.SortOrder;
 import org.apache.atlas.discovery.SearchProcessor;
 import org.apache.atlas.exception.AtlasBaseException;
@@ -38,6 +39,7 @@ import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasStructType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
@@ -268,6 +270,8 @@ public class AtlasGraphUtilsV2 {
     }
 
     public static AtlasVertex findByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("findByUniqueAttributes");
+
         AtlasVertex vertex = null;
 
         final Map<String, AtlasAttribute> uniqueAttributes = entityType.getUniqAttributes();
@@ -302,6 +306,8 @@ public class AtlasGraphUtilsV2 {
             }
         }
 
+        RequestContext.get().endMetricRecord(metric);
+
         return vertex;
     }
 
@@ -345,6 +351,8 @@ public class AtlasGraphUtilsV2 {
     }
 
     public static AtlasVertex findByTypeAndPropertyName(String typeName, String propertyName, Object attrVal) {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("findByTypeAndPropertyName");
+
         AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
                                                     .has(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName)
                                                     .has(propertyName, attrVal)
@@ -354,10 +362,14 @@ public class AtlasGraphUtilsV2 {
 
         AtlasVertex vertex = results.hasNext() ? results.next() : null;
 
+        RequestContext.get().endMetricRecord(metric);
+
         return vertex;
     }
 
     public static AtlasVertex findBySuperTypeAndPropertyName(String typeName, String propertyName, Object attrVal) {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("findBySuperTypeAndPropertyName");
+
         AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
                                                     .has(Constants.SUPER_TYPES_PROPERTY_KEY, typeName)
                                                     .has(propertyName, attrVal)
@@ -367,6 +379,8 @@ public class AtlasGraphUtilsV2 {
 
         AtlasVertex vertex = results.hasNext() ? results.next() : null;
 
+        RequestContext.get().endMetricRecord(metric);
+
         return vertex;
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 6c830ba..98734cc 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -55,6 +55,7 @@ import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.atlas.utils.AtlasJson;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
@@ -175,6 +176,8 @@ public class EntityGraphMapper {
     }
 
     public EntityMutationResponse mapAttributesAndClassifications(EntityMutationContext context, final boolean isPartialUpdate, final boolean replaceClassifications) throws AtlasBaseException {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("mapAttributesAndClassifications");
+
         EntityMutationResponse resp = new EntityMutationResponse();
 
         Collection<AtlasEntity> createdEntities = context.getCreatedEntities();
@@ -237,6 +240,8 @@ public class EntityGraphMapper {
             }
         }
 
+        RequestContext.get().endMetricRecord(metric);
+
         return resp;
     }
 
@@ -284,6 +289,8 @@ public class EntityGraphMapper {
         }
 
         if (MapUtils.isNotEmpty(struct.getAttributes())) {
+            MetricRecorder metric = RequestContext.get().startMetricRecord("mapAttributes");
+
             AtlasStructType structType = getStructType(struct.getTypeName());
 
             if (op.equals(CREATE)) {
@@ -308,6 +315,8 @@ public class EntityGraphMapper {
             }
 
             updateModificationMetadata(vertex);
+
+            RequestContext.get().endMetricRecord(metric);
         }
 
         if (LOG.isDebugEnabled()) {
@@ -322,6 +331,8 @@ public class EntityGraphMapper {
         }
 
         if (MapUtils.isNotEmpty(entity.getRelationshipAttributes())) {
+            MetricRecorder metric = RequestContext.get().startMetricRecord("mapRelationshipAttributes");
+
             AtlasEntityType entityType = getEntityType(entity.getTypeName());
 
             if (op.equals(CREATE)) {
@@ -343,6 +354,8 @@ public class EntityGraphMapper {
             }
 
             updateModificationMetadata(vertex);
+
+            RequestContext.get().endMetricRecord(metric);
         }
 
         if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/server-api/src/main/java/org/apache/atlas/RequestContext.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java
index 099d713..29f2974 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContext.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -23,6 +23,8 @@ import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.store.DeleteType;
+import org.apache.atlas.utils.AtlasPerfMetrics;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,10 +32,11 @@ import org.slf4j.LoggerFactory;
 import java.util.*;
 
 public class RequestContext {
-    private static final Logger LOG = LoggerFactory.getLogger(RequestContext.class);
+    private static final Logger METRICS = LoggerFactory.getLogger("METRICS");
 
     private static final ThreadLocal<RequestContext> CURRENT_CONTEXT = new ThreadLocal<>();
     private static final Set<RequestContext>         ACTIVE_REQUESTS = new HashSet<>();
+    private static final boolean                     isMetricsEnabled = METRICS.isDebugEnabled();
 
     private final long                                   requestTime         = System.currentTimeMillis();
     private final Map<String, AtlasObjectId>             updatedEntities     = new HashMap<>();
@@ -42,6 +45,7 @@ public class RequestContext {
     private final Map<String, AtlasEntityWithExtInfo>    entityExtInfoCache  = new HashMap<>();
     private final Map<String, List<AtlasClassification>> addedPropagations   = new HashMap<>();
     private final Map<String, List<AtlasClassification>> removedPropagations = new HashMap<>();
+    private final AtlasPerfMetrics                       metrics             = isMetricsEnabled ? new AtlasPerfMetrics() : null;
     private       List<EntityGuidPair>                   entityGuidInRequest = null;
 
     private String      user;
@@ -95,6 +99,12 @@ public class RequestContext {
         this.addedPropagations.clear();
         this.removedPropagations.clear();
 
+        if (metrics != null && !metrics.isEmpty()) {
+            METRICS.debug(metrics.toString());
+
+            metrics.clear();
+        }
+
         if (this.entityGuidInRequest != null) {
             this.entityGuidInRequest.clear();
         }
@@ -273,6 +283,16 @@ public class RequestContext {
         return deletedEntities.containsKey(guid);
     }
 
+
+
+    public MetricRecorder startMetricRecord(String name) { return metrics != null ? metrics.getMetricRecorder(name) : null; }
+
+    public void endMetricRecord(MetricRecorder recorder) {
+        if (metrics != null && recorder != null) {
+            metrics.recordMetric(recorder);
+        }
+    }
+
     public void recordEntityGuidUpdate(AtlasEntity entity, String guidInRequest) {
         if (entityGuidInRequest == null) {
             entityGuidInRequest = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
index e0a60a1..ef9ebab 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
@@ -32,6 +32,7 @@ import org.apache.atlas.type.AtlasClassificationType;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.configuration.Configuration;
@@ -118,6 +119,8 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
     }
 
     private void notifyEntityEvents(List<AtlasEntity> entities, OperationType operationType) throws AtlasBaseException {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("entityNotification");
+
         List<EntityNotificationV2> messages = new ArrayList<>();
 
         for (AtlasEntity entity : entities) {
@@ -135,6 +138,8 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
                 throw new AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, e, operationType.name());
             }
         }
+
+        RequestContext.get().endMetricRecord(metric);
     }
 
     private AtlasEntityHeader toNotificationHeader(AtlasEntity entity) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
index b5e7ed8..520d1ea 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
@@ -19,8 +19,10 @@ package org.apache.atlas.notification;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContext;
 import org.apache.atlas.listener.EntityChangeListener;
 import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.v1.model.instance.Struct;
 import org.apache.atlas.v1.model.notification.EntityNotificationV1;
@@ -159,6 +161,8 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
     // send notification of entity change
     private void notifyOfEntityEvent(Collection<Referenceable> entityDefinitions,
                                      OperationType             operationType) throws AtlasException {
+        MetricRecorder metric = RequestContext.get().startMetricRecord("entityNotification");
+
         List<EntityNotificationV1> messages = new ArrayList<>();
 
         for (Referenceable entityDefinition : entityDefinitions) {
@@ -186,6 +190,8 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
         if (!messages.isEmpty()) {
             notificationSender.send(messages);
         }
+
+        RequestContext.get().endMetricRecord(metric);
     }
 
     private List<String> getNotificationAttributes(String entityType) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index b344c50..003e5b0 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -88,6 +88,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     private static final Logger LOG        = LoggerFactory.getLogger(NotificationHookConsumer.class);
     private static final Logger PERF_LOG   = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class);
     private static final Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
+    private static final Logger LARGE_MESSAGES_LOG = LoggerFactory.getLogger("LARGE_MESSAGES");
 
     private static final int    SC_OK          = 200;
     private static final int    SC_BAD_REQUEST = 400;
@@ -121,6 +122,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     private final int                    maxWaitDuration;
     private final boolean                skipHiveColumnLineageHive20633;
     private final int                    skipHiveColumnLineageHive20633InputsThreshold;
+    private final int                    largeMessageProcessingTimeThresholdMs;
     private final boolean                consumerDisabled;
 
     private NotificationInterface notificationInterface;
@@ -153,6 +155,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         skipHiveColumnLineageHive20633                = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
         skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
         consumerDisabled 							  = applicationProperties.getBoolean(CONSUMER_DISABLED, false);
+        largeMessageProcessingTimeThresholdMs         = applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60 * 1000);  //  60 sec by default
 
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
@@ -603,6 +606,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             } finally {
                 AtlasPerfTracer.log(perf);
 
+                long msgProcessingTime = perf != null ? perf.getElapsedTime() : 0;
+
+                if (msgProcessingTime > largeMessageProcessingTimeThresholdMs) {
+                    String strMessage = AbstractNotification.getMessageJson(message);
+
+                    LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", msgProcessingTime, strMessage.length(), kafkaMsg.getOffset());
+                    LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", msgProcessingTime, strMessage.length(), kafkaMsg.getOffset(), strMessage);
+                }
+
                 if (auditLog != null) {
                     auditLog.setHttpStatus(isFailedMsg ? SC_BAD_REQUEST : SC_OK);
                     auditLog.setTimeTaken(System.currentTimeMillis() - startTime);