You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2018/12/18 23:06:24 UTC
atlas git commit: ATLAS-3002: added instrumentation to collect time
taken for sub-tasks during entity create/update
Repository: atlas
Updated Branches:
refs/heads/master bd0c5a8a8 -> beb34506a
ATLAS-3002: added instrumentation to collect time taken for sub-tasks during entity create/update
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/beb34506
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/beb34506
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/beb34506
Branch: refs/heads/master
Commit: beb34506a15379af4a306902da049f37b445a2f5
Parents: bd0c5a8
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Tue Dec 18 09:17:29 2018 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Tue Dec 18 15:02:13 2018 -0800
----------------------------------------------------------------------
.../hive/hook/events/CreateHiveProcess.java | 2 +-
.../apache/atlas/utils/AtlasPerfMetrics.java | 121 +++++++++++++++++++
distro/src/conf/atlas-log4j.xml | 19 ++-
.../apache/atlas/kafka/KafkaNotification.java | 3 +
.../atlas/GraphTransactionInterceptor.java | 6 +
.../repository/audit/EntityAuditListener.java | 33 +++++
.../repository/audit/EntityAuditListenerV2.java | 34 +++++-
.../repository/graph/FullTextMapperV2.java | 86 +++++++++----
.../graph/v2/AtlasEntityChangeNotifier.java | 10 +-
.../graph/v2/AtlasEntityGraphDiscoveryV2.java | 10 ++
.../store/graph/v2/AtlasEntityStoreV2.java | 9 ++
.../store/graph/v2/AtlasGraphUtilsV2.java | 14 +++
.../store/graph/v2/EntityGraphMapper.java | 13 ++
.../java/org/apache/atlas/RequestContext.java | 22 +++-
.../EntityNotificationListenerV2.java | 5 +
.../NotificationEntityChangeListener.java | 6 +
.../notification/NotificationHookConsumer.java | 12 ++
17 files changed, 378 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
index 2ccfff4..d61f1d7 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
@@ -185,7 +185,7 @@ public class CreateHiveProcess extends BaseHiveEvent {
AtlasEntity columnLineageProcess = new AtlasEntity(HIVE_TYPE_COLUMN_LINEAGE);
- columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
+ columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
columnLineageProcess.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputColumns));
columnLineageProcess.setAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getObjectId(outputColumn)));
http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java b/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java
new file mode 100644
index 0000000..c72b2c3
--- /dev/null
+++ b/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.utils;
+
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class AtlasPerfMetrics {
+ private final Map<String, Metric> metrics = new LinkedHashMap<>();
+
+
+ public MetricRecorder getMetricRecorder(String name) {
+ return new MetricRecorder(name);
+ }
+
+ public void recordMetric(MetricRecorder recorder) {
+ if (recorder != null) {
+ final String name = recorder.name;
+ final long timeTaken = recorder.getElapsedTime();
+
+ Metric metric = metrics.get(name);
+
+ if (metric == null) {
+ metric = new Metric(name);
+
+ metrics.put(name, metric);
+ }
+
+ metric.invocations++;
+ metric.totalTimeMSecs += timeTaken;
+ }
+ }
+
+ public void clear() {
+ metrics.clear();
+ }
+
+ public boolean isEmpty() {
+ return metrics.isEmpty();
+ }
+
+ public Set<String> getMetricsNames() {
+ return metrics.keySet();
+ }
+
+ public Metric getMetric(String name) {
+ return metrics.get(name);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("{");
+
+ if (!metrics.isEmpty()) {
+ for (Metric metric : metrics.values()) {
+ sb.append("\"").append(metric.getName()).append("\":{\"count\":").append(metric.getInvocations()).append(",\"timeTaken\":").append(metric.getTotalTimeMSecs()).append("},");
+ }
+
+ sb.setLength(sb.length() - 1); // remove last ","
+ }
+
+ sb.append("}");
+
+ return sb.toString();
+ }
+
+ public class MetricRecorder {
+ private final String name;
+ private final long startTimeMs = System.currentTimeMillis();
+
+ MetricRecorder(String name) {
+ this.name = name;
+ }
+
+ long getElapsedTime() {
+ return System.currentTimeMillis() - startTimeMs;
+ }
+ }
+
+ public static class Metric {
+ private final String name;
+ private short invocations = 0;
+ private long totalTimeMSecs = 0;
+
+ public Metric(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public short getInvocations() {
+ return invocations;
+ }
+
+ public long getTotalTimeMSecs() {
+ return totalTimeMSecs;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/distro/src/conf/atlas-log4j.xml
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-log4j.xml b/distro/src/conf/atlas-log4j.xml
index c183871..9b4fa04 100755
--- a/distro/src/conf/atlas-log4j.xml
+++ b/distro/src/conf/atlas-log4j.xml
@@ -37,6 +37,16 @@
</layout>
</appender>
+ <appender name="LARGE_MESSAGES" class="org.apache.log4j.RollingFileAppender">
+ <param name="File" value="{{log_dir}}/large_messages.log"/>
+ <param name="Append" value="true"/>
+ <param name="MaxFileSize" value="100MB" />
+ <param name="MaxBackupIndex" value="20" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %m%n"/>
+ </layout>
+ </appender>
+
<appender name="AUDIT" class="org.apache.log4j.RollingFileAppender">
<param name="File" value="${atlas.log.dir}/audit.log"/>
<param name="Append" value="true"/>
@@ -60,7 +70,7 @@
<param name="File" value="${atlas.log.dir}/failed.log"/>
<param name="Append" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %m"/>
+ <param name="ConversionPattern" value="%d %m%n"/>
<param name="maxFileSize" value="100MB" />
<param name="maxBackupIndex" value="20" />
</layout>
@@ -119,6 +129,11 @@
<appender-ref ref="AUDIT"/>
</logger>
+ <logger name="LARGE_MESSAGES" additivity="false">
+ <level value="warn"/>
+ <appender-ref ref="LARGE_MESSAGES"/>
+ </logger>
+
<logger name="METRICS" additivity="false">
<level value="debug"/>
<appender-ref ref="METRICS"/>
@@ -126,7 +141,7 @@
<logger name="FAILED" additivity="false">
<level value="info"/>
- <appender-ref ref="AUDIT"/>
+ <appender-ref ref="FAILED"/>
</logger>
<root>
http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 4bec917..1d0a273 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -106,6 +106,9 @@ public class KafkaNotification extends AbstractNotification implements Service {
properties.put("enable.auto.commit", kafkaConf.getBoolean("enable.auto.commit", oldApiCommitEnableFlag));
properties.put("session.timeout.ms", kafkaConf.getString("session.timeout.ms", "30000"));
+ // if no value is specified for max.poll.records, set to 1
+ properties.put("max.poll.records", kafkaConf.getInt("max.poll.records", 1));
+
LOG.info("<== KafkaNotification()");
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
index 4c43677..cbd2226 100644
--- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
+++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
@@ -23,6 +23,7 @@ import org.aopalliance.intercept.MethodInvocation;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.exception.NotFoundException;
import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -69,6 +70,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
}
boolean isSuccess = false;
+ MetricRecorder metric = null;
try {
try {
@@ -79,6 +81,8 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
LOG.debug("Ignoring commit for nested/inner transaction {}.{}", invokingClass, invokedMethodName);
}
} else {
+ metric = RequestContext.get().startMetricRecord("graphCommit");
+
doCommitOrRollback(invokingClass, invokedMethodName);
}
@@ -97,6 +101,8 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
throw t;
}
} finally {
+ RequestContext.get().endMetricRecord(metric);
+
// Only outer txn can mark as closed
if (!isInnerTxn) {
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
index dfacb38..69d373d 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
@@ -24,6 +24,7 @@ import org.apache.atlas.EntityAuditEvent.EntityAuditAction;
import org.apache.atlas.RequestContext;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.type.AtlasEntityType;
@@ -65,6 +66,8 @@ public class EntityAuditListener implements EntityChangeListener {
@Override
public void onEntitiesAdded(Collection<Referenceable> entities, boolean isImport) throws AtlasException {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
List<EntityAuditEvent> events = new ArrayList<>();
for (Referenceable entity : entities) {
EntityAuditEvent event = createEvent(entity, isImport ? EntityAuditAction.ENTITY_IMPORT_CREATE : EntityAuditAction.ENTITY_CREATE);
@@ -72,10 +75,14 @@ public class EntityAuditListener implements EntityChangeListener {
}
auditRepository.putEventsV1(events);
+
+ RequestContext.get().endMetricRecord(metric);
}
@Override
public void onEntitiesUpdated(Collection<Referenceable> entities, boolean isImport) throws AtlasException {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
List<EntityAuditEvent> events = new ArrayList<>();
for (Referenceable entity : entities) {
EntityAuditEvent event = createEvent(entity, isImport ? EntityAuditAction.ENTITY_IMPORT_UPDATE : EntityAuditAction.ENTITY_UPDATE);
@@ -83,45 +90,61 @@ public class EntityAuditListener implements EntityChangeListener {
}
auditRepository.putEventsV1(events);
+
+ RequestContext.get().endMetricRecord(metric);
}
@Override
public void onTraitsAdded(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException {
if (traits != null) {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
for (Struct trait : traits) {
EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_ADD,
"Added trait: " + AtlasType.toV1Json(trait));
auditRepository.putEventsV1(event);
}
+
+ RequestContext.get().endMetricRecord(metric);
}
}
@Override
public void onTraitsDeleted(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException {
if (traits != null) {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
for (Struct trait : traits) {
EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_DELETE, "Deleted trait: " + trait.getTypeName());
auditRepository.putEventsV1(event);
}
+
+ RequestContext.get().endMetricRecord(metric);
}
}
@Override
public void onTraitsUpdated(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException {
if (traits != null) {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
for (Struct trait : traits) {
EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_UPDATE,
"Updated trait: " + AtlasType.toV1Json(trait));
auditRepository.putEventsV1(event);
}
+
+ RequestContext.get().endMetricRecord(metric);
}
}
@Override
public void onEntitiesDeleted(Collection<Referenceable> entities, boolean isImport) throws AtlasException {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
List<EntityAuditEvent> events = new ArrayList<>();
for (Referenceable entity : entities) {
EntityAuditEvent event = createEvent(entity, isImport ? EntityAuditAction.ENTITY_IMPORT_DELETE : EntityAuditAction.ENTITY_DELETE, "Deleted entity");
@@ -129,10 +152,14 @@ public class EntityAuditListener implements EntityChangeListener {
}
auditRepository.putEventsV1(events);
+
+ RequestContext.get().endMetricRecord(metric);
}
@Override
public void onTermAdded(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
List<EntityAuditEvent> events = new ArrayList<>();
for (Referenceable entity : entities) {
@@ -140,10 +167,14 @@ public class EntityAuditListener implements EntityChangeListener {
}
auditRepository.putEventsV1(events);
+
+ RequestContext.get().endMetricRecord(metric);
}
@Override
public void onTermDeleted(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
List<EntityAuditEvent> events = new ArrayList<>();
for (Referenceable entity : entities) {
@@ -151,6 +182,8 @@ public class EntityAuditListener implements EntityChangeListener {
}
auditRepository.putEventsV1(events);
+
+ RequestContext.get().endMetricRecord(metric);
}
public List<EntityAuditEvent> getAuditEvents(String guid) throws AtlasException{
http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
index 8ca8c9a..16d8879 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
@@ -26,13 +26,13 @@ import org.apache.atlas.listener.EntityChangeListenerV2;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
@@ -79,6 +79,8 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
@Override
public void onEntitiesAdded(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasEntity entity : entities) {
@@ -88,10 +90,14 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
auditRepository.putEventsV2(events);
+
+ RequestContext.get().endMetricRecord(metric);
}
@Override
public void onEntitiesUpdated(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasEntity entity : entities) {
@@ -101,10 +107,14 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
auditRepository.putEventsV2(events);
+
+ RequestContext.get().endMetricRecord(metric);
}
@Override
public void onEntitiesDeleted(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasEntity entity : entities) {
@@ -114,11 +124,15 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
auditRepository.putEventsV2(events);
+
+ RequestContext.get().endMetricRecord(metric);
}
@Override
public void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasClassification classification : classifications) {
@@ -130,12 +144,16 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
auditRepository.putEventsV2(events);
+
+ RequestContext.get().endMetricRecord(metric);
}
}
@Override
public void onClassificationsUpdated(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
List<EntityAuditEventV2> events = new ArrayList<>();
String guid = entity.getGuid();
@@ -154,12 +172,16 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
auditRepository.putEventsV2(events);
+
+ RequestContext.get().endMetricRecord(metric);
}
}
@Override
public void onClassificationsDeleted(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasClassification classification : classifications) {
@@ -171,12 +193,16 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
auditRepository.putEventsV2(events);
+
+ RequestContext.get().endMetricRecord(metric);
}
}
@Override
public void onTermAdded(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException {
if (term != null && CollectionUtils.isNotEmpty(entities)) {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasRelatedObjectId relatedObjectId : entities) {
@@ -188,12 +214,16 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
auditRepository.putEventsV2(events);
+
+ RequestContext.get().endMetricRecord(metric);
}
}
@Override
public void onTermDeleted(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException {
if (term != null && CollectionUtils.isNotEmpty(entities)) {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasRelatedObjectId relatedObjectId : entities) {
@@ -205,6 +235,8 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
auditRepository.putEventsV2(events);
+
+ RequestContext.get().endMetricRecord(metric);
}
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
index 08ccd9c..77cecff 100644
--- a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
@@ -26,6 +26,13 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasArrayType;
+import org.apache.atlas.type.AtlasBuiltInTypes;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
@@ -37,6 +44,7 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -52,15 +60,18 @@ public class FullTextMapperV2 {
private static final String FULL_TEXT_FOLLOW_REFERENCES = "atlas.search.fulltext.followReferences";
private static final String FULL_TEXT_EXCLUDE_ATTRIBUTE_PROPERTY = "atlas.search.fulltext.type";
- private final EntityGraphRetriever entityGraphRetriever;
+ private final AtlasTypeRegistry typeRegistry;
private final Configuration configuration;
+ private final EntityGraphRetriever entityGraphRetriever;
private final boolean followReferences;
private final Map<String, Set<String>> excludeAttributesCache = new HashMap<>();
@Inject
public FullTextMapperV2(AtlasTypeRegistry typeRegistry, Configuration configuration) {
+ this.typeRegistry = typeRegistry;
this.configuration = configuration;
+
followReferences = this.configuration != null && this.configuration.getBoolean(FULL_TEXT_FOLLOW_REFERENCES, false);
// If followReferences = false then ignore relationship attr loading
entityGraphRetriever = new EntityGraphRetriever(typeRegistry, !followReferences);
@@ -90,11 +101,12 @@ public class FullTextMapperV2 {
if (CollectionUtils.isNotEmpty(classifications)) {
for (AtlasClassification classification : classifications) {
- sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER);
+ final AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classification.getTypeName());
+ final Set<String> excludeAttributes = getExcludeAttributesForIndexText(classification.getTypeName());
- Set<String> excludeAttributes = getExcludeAttributesForIndexText(classification.getTypeName());
+ sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER);
- mapAttributes(classification.getAttributes(), entityWithExtInfo, sb, new HashSet<String>(), excludeAttributes);
+ mapAttributes(classificationType, classification.getAttributes(), entityWithExtInfo, sb, new HashSet<String>(), excludeAttributes);
}
}
@@ -109,13 +121,24 @@ public class FullTextMapperV2 {
}
public String getIndexTextForEntity(String guid) throws AtlasBaseException {
- String ret = null;
- AtlasEntity entity = getAndCacheEntity(guid);
+ String ret = null;
+ final AtlasEntity entity;
+ final AtlasEntityExtInfo entityExtInfo;
+
+ if (followReferences) {
+ AtlasEntityWithExtInfo entityWithExtInfo = getAndCacheEntityWithExtInfo(guid);
+
+ entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
+ entityExtInfo = entityWithExtInfo;
+ } else {
+ entity = getAndCacheEntity(guid);
+ entityExtInfo = null;
+ }
if (entity != null) {
StringBuilder sb = new StringBuilder();
- map(entity, null, sb, new HashSet<String>());
+ map(entity, entityExtInfo, sb, new HashSet<String>());
ret = sb.toString();
}
@@ -132,27 +155,30 @@ public class FullTextMapperV2 {
return;
}
+ final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
+ final Set<String> excludeAttributes = getExcludeAttributesForIndexText(entity.getTypeName());
+
processedGuids.add(entity.getGuid());
sb.append(entity.getTypeName()).append(FULL_TEXT_DELIMITER);
- Set<String> excludeAttributes = getExcludeAttributesForIndexText(entity.getTypeName());
+ mapAttributes(entityType, entity.getAttributes(), entityExtInfo, sb, processedGuids, excludeAttributes);
- mapAttributes(entity.getAttributes(), entityExtInfo, sb, processedGuids, excludeAttributes);
-
- List<AtlasClassification> classifications = entity.getClassifications();
+ final List<AtlasClassification> classifications = entity.getClassifications();
if (CollectionUtils.isNotEmpty(classifications)) {
for (AtlasClassification classification : classifications) {
- sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER);
+ final AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classification.getTypeName());
+ final Set<String> excludeClassificationAttributes = getExcludeAttributesForIndexText(classification.getTypeName());
- Set<String> excludeClassificationAttributes = getExcludeAttributesForIndexText(classification.getTypeName());
- mapAttributes(classification.getAttributes(), entityExtInfo, sb, processedGuids, excludeClassificationAttributes);
+ sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER);
+
+ mapAttributes(classificationType, classification.getAttributes(), entityExtInfo, sb, processedGuids, excludeClassificationAttributes);
}
}
}
- private void mapAttributes(Map<String, Object> attributes, AtlasEntityExtInfo entityExtInfo, StringBuilder sb,
+ private void mapAttributes(AtlasStructType structType, Map<String, Object> attributes, AtlasEntityExtInfo entityExtInfo, StringBuilder sb,
Set<String> processedGuids, Set<String> excludeAttributes) throws AtlasBaseException {
if (MapUtils.isEmpty(attributes)) {
return;
@@ -162,10 +188,28 @@ public class FullTextMapperV2 {
String attribKey = attributeEntry.getKey();
Object attrValue = attributeEntry.getValue();
- if (attrValue == null || isExcludedAttribute(excludeAttributes, attribKey)) {
+ if (attrValue == null || excludeAttributes.contains(attribKey)) {
continue;
}
+ if (!followReferences) {
+ AtlasAttribute attribute = structType != null ? structType.getAttribute(attribKey) : null;
+ AtlasType attributeType = attribute != null ? attribute.getAttributeType() : null;
+
+ if (attributeType == null) {
+ continue;
+ }
+
+ if (attributeType instanceof AtlasArrayType) {
+ attributeType = ((AtlasArrayType) attributeType).getElementType();
+ }
+
+ if (attributeType instanceof AtlasEntityType || attributeType instanceof AtlasBuiltInTypes.AtlasObjectIdType) {
+ continue;
+ }
+ }
+
+
sb.append(attribKey).append(FULL_TEXT_DELIMITER);
mapAttribute(attrValue, entityExtInfo, sb, processedGuids);
@@ -249,12 +293,8 @@ public class FullTextMapperV2 {
return entityWithExtInfo;
}
- private boolean isExcludedAttribute(Set<String> excludeAttributes, String attributeName) {
- return CollectionUtils.isNotEmpty(excludeAttributes) && excludeAttributes.contains(attributeName);
- }
-
private Set<String> getExcludeAttributesForIndexText(String typeName) {
- Set<String> ret = null;
+ final Set<String> ret;
if (excludeAttributesCache.containsKey(typeName)) {
ret = excludeAttributesCache.get(typeName);
@@ -265,9 +305,13 @@ public class FullTextMapperV2 {
if (ArrayUtils.isNotEmpty(excludeAttributes)) {
ret = new HashSet<>(Arrays.asList(excludeAttributes));
+ } else {
+ ret = Collections.emptySet();
}
excludeAttributesCache.put(typeName, ret);
+ } else {
+ ret = Collections.emptySet();
}
return ret;
http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
index a8c3363..ca3179a 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
@@ -29,13 +29,13 @@ import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
@@ -437,6 +437,8 @@ public class AtlasEntityChangeNotifier {
LOG.warn("Unable to determine if FullText is disabled. Proceeding with FullText mapping");
}
+ MetricRecorder metric = RequestContext.get().startMetricRecord("fullTextMapping");
+
for (AtlasEntityHeader entityHeader : entityHeaders) {
if(GraphHelper.isInternalType(entityHeader.getTypeName())) {
continue;
@@ -457,6 +459,8 @@ public class AtlasEntityChangeNotifier {
LOG.error("FullText mapping failed for Vertex[ guid = {} ]", guid, e);
}
}
+
+ RequestContext.get().endMetricRecord(metric);
}
private void updateFullTextMapping(String entityId, List<AtlasClassification> classifications) {
@@ -477,6 +481,8 @@ public class AtlasEntityChangeNotifier {
return;
}
+ MetricRecorder metric = RequestContext.get().startMetricRecord("fullTextMapping");
+
try {
String classificationFullText = fullTextMapperV2.getIndexTextForClassifications(entityId, classifications);
String existingFullText = AtlasGraphUtilsV2.getEncodedProperty(atlasVertex, ENTITY_TEXT_PROPERTY_KEY, String.class);
@@ -486,6 +492,8 @@ public class AtlasEntityChangeNotifier {
} catch (AtlasBaseException e) {
LOG.error("FullText mapping failed for Vertex[ guid = {} ]", entityId, e);
}
+
+ RequestContext.get().endMetricRecord(metric);
}
private void doFullTextMapping(String guid) {
http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
index 6580bee..ddab2bf 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
@@ -18,6 +18,7 @@
package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity;
@@ -35,6 +36,7 @@ import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -127,6 +129,8 @@ public class AtlasEntityGraphDiscoveryV2 implements EntityGraphDiscovery {
protected void discover() throws AtlasBaseException {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("walkEntityGraph");
+
EntityStream entityStream = discoveryContext.getEntityStream();
Set<String> walkedEntities = new HashSet<>();
@@ -162,9 +166,13 @@ public class AtlasEntityGraphDiscoveryV2 implements EntityGraphDiscovery {
walkedEntities.add(entity.getGuid());
}
}
+
+ RequestContext.get().endMetricRecord(metric);
}
protected void resolveReferences() throws AtlasBaseException {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("resolveReferences");
+
EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(typeRegistry),
new UniqAttrBasedEntityResolver(typeRegistry)
};
@@ -172,6 +180,8 @@ public class AtlasEntityGraphDiscoveryV2 implements EntityGraphDiscovery {
for (EntityResolver resolver : entityResolvers) {
resolver.resolveEntityReferences(discoveryContext);
}
+
+ RequestContext.get().endMetricRecord(metric);
}
private void visitReference(AtlasObjectIdType type, Object val) throws AtlasBaseException {
http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index d97b74d..35aa3af 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -41,6 +41,7 @@ import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.AtlasEntityUtil;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
@@ -671,6 +672,8 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "createOrUpdate()");
}
+ MetricRecorder metric = RequestContext.get().startMetricRecord("createOrUpdate");
+
try {
final EntityMutationContext context = preCreateOrUpdate(entityStream, entityGraphMapper, isPartialUpdate);
@@ -730,11 +733,15 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
return ret;
} finally {
+ RequestContext.get().endMetricRecord(metric);
+
AtlasPerfTracer.log(perf);
}
}
private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("preCreateOrUpdate");
+
EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV2(typeRegistry, entityStream);
EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities();
EntityMutationContext context = new EntityMutationContext(discoveryContext);
@@ -797,6 +804,8 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
}
}
+ RequestContext.get().endMetricRecord(metric);
+
return context;
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
index 25770a3..69d3f63 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.RequestContext;
import org.apache.atlas.SortOrder;
import org.apache.atlas.discovery.SearchProcessor;
import org.apache.atlas.exception.AtlasBaseException;
@@ -38,6 +39,7 @@ import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
@@ -268,6 +270,8 @@ public class AtlasGraphUtilsV2 {
}
public static AtlasVertex findByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("findByUniqueAttributes");
+
AtlasVertex vertex = null;
final Map<String, AtlasAttribute> uniqueAttributes = entityType.getUniqAttributes();
@@ -302,6 +306,8 @@ public class AtlasGraphUtilsV2 {
}
}
+ RequestContext.get().endMetricRecord(metric);
+
return vertex;
}
@@ -345,6 +351,8 @@ public class AtlasGraphUtilsV2 {
}
public static AtlasVertex findByTypeAndPropertyName(String typeName, String propertyName, Object attrVal) {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("findByTypeAndPropertyName");
+
AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
.has(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName)
.has(propertyName, attrVal)
@@ -354,10 +362,14 @@ public class AtlasGraphUtilsV2 {
AtlasVertex vertex = results.hasNext() ? results.next() : null;
+ RequestContext.get().endMetricRecord(metric);
+
return vertex;
}
public static AtlasVertex findBySuperTypeAndPropertyName(String typeName, String propertyName, Object attrVal) {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("findBySuperTypeAndPropertyName");
+
AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
.has(Constants.SUPER_TYPES_PROPERTY_KEY, typeName)
.has(propertyName, attrVal)
@@ -367,6 +379,8 @@ public class AtlasGraphUtilsV2 {
AtlasVertex vertex = results.hasNext() ? results.next() : null;
+ RequestContext.get().endMetricRecord(metric);
+
return vertex;
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 6c830ba..98734cc 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -55,6 +55,7 @@ import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.AtlasJson;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
@@ -175,6 +176,8 @@ public class EntityGraphMapper {
}
public EntityMutationResponse mapAttributesAndClassifications(EntityMutationContext context, final boolean isPartialUpdate, final boolean replaceClassifications) throws AtlasBaseException {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("mapAttributesAndClassifications");
+
EntityMutationResponse resp = new EntityMutationResponse();
Collection<AtlasEntity> createdEntities = context.getCreatedEntities();
@@ -237,6 +240,8 @@ public class EntityGraphMapper {
}
}
+ RequestContext.get().endMetricRecord(metric);
+
return resp;
}
@@ -284,6 +289,8 @@ public class EntityGraphMapper {
}
if (MapUtils.isNotEmpty(struct.getAttributes())) {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("mapAttributes");
+
AtlasStructType structType = getStructType(struct.getTypeName());
if (op.equals(CREATE)) {
@@ -308,6 +315,8 @@ public class EntityGraphMapper {
}
updateModificationMetadata(vertex);
+
+ RequestContext.get().endMetricRecord(metric);
}
if (LOG.isDebugEnabled()) {
@@ -322,6 +331,8 @@ public class EntityGraphMapper {
}
if (MapUtils.isNotEmpty(entity.getRelationshipAttributes())) {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("mapRelationshipAttributes");
+
AtlasEntityType entityType = getEntityType(entity.getTypeName());
if (op.equals(CREATE)) {
@@ -343,6 +354,8 @@ public class EntityGraphMapper {
}
updateModificationMetadata(vertex);
+
+ RequestContext.get().endMetricRecord(metric);
}
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/server-api/src/main/java/org/apache/atlas/RequestContext.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java
index 099d713..29f2974 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContext.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -23,6 +23,8 @@ import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.store.DeleteType;
+import org.apache.atlas.utils.AtlasPerfMetrics;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,10 +32,11 @@ import org.slf4j.LoggerFactory;
import java.util.*;
public class RequestContext {
- private static final Logger LOG = LoggerFactory.getLogger(RequestContext.class);
+ private static final Logger METRICS = LoggerFactory.getLogger("METRICS");
private static final ThreadLocal<RequestContext> CURRENT_CONTEXT = new ThreadLocal<>();
private static final Set<RequestContext> ACTIVE_REQUESTS = new HashSet<>();
+ private static final boolean isMetricsEnabled = METRICS.isDebugEnabled();
private final long requestTime = System.currentTimeMillis();
private final Map<String, AtlasObjectId> updatedEntities = new HashMap<>();
@@ -42,6 +45,7 @@ public class RequestContext {
private final Map<String, AtlasEntityWithExtInfo> entityExtInfoCache = new HashMap<>();
private final Map<String, List<AtlasClassification>> addedPropagations = new HashMap<>();
private final Map<String, List<AtlasClassification>> removedPropagations = new HashMap<>();
+ private final AtlasPerfMetrics metrics = isMetricsEnabled ? new AtlasPerfMetrics() : null;
private List<EntityGuidPair> entityGuidInRequest = null;
private String user;
@@ -95,6 +99,12 @@ public class RequestContext {
this.addedPropagations.clear();
this.removedPropagations.clear();
+ if (metrics != null && !metrics.isEmpty()) {
+ METRICS.debug(metrics.toString());
+
+ metrics.clear();
+ }
+
if (this.entityGuidInRequest != null) {
this.entityGuidInRequest.clear();
}
@@ -273,6 +283,16 @@ public class RequestContext {
return deletedEntities.containsKey(guid);
}
+
+
+ public MetricRecorder startMetricRecord(String name) { return metrics != null ? metrics.getMetricRecorder(name) : null; }
+
+ public void endMetricRecord(MetricRecorder recorder) {
+ if (metrics != null && recorder != null) {
+ metrics.recordMetric(recorder);
+ }
+ }
+
public void recordEntityGuidUpdate(AtlasEntity entity, String guidInRequest) {
if (entityGuidInRequest == null) {
entityGuidInRequest = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
index e0a60a1..ef9ebab 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
@@ -32,6 +32,7 @@ import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
@@ -118,6 +119,8 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
}
private void notifyEntityEvents(List<AtlasEntity> entities, OperationType operationType) throws AtlasBaseException {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityNotification");
+
List<EntityNotificationV2> messages = new ArrayList<>();
for (AtlasEntity entity : entities) {
@@ -135,6 +138,8 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
throw new AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, e, operationType.name());
}
}
+
+ RequestContext.get().endMetricRecord(metric);
}
private AtlasEntityHeader toNotificationHeader(AtlasEntity entity) {
http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
index b5e7ed8..520d1ea 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
@@ -19,8 +19,10 @@ package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContext;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
+import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.v1.model.notification.EntityNotificationV1;
@@ -159,6 +161,8 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
// send notification of entity change
private void notifyOfEntityEvent(Collection<Referenceable> entityDefinitions,
OperationType operationType) throws AtlasException {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityNotification");
+
List<EntityNotificationV1> messages = new ArrayList<>();
for (Referenceable entityDefinition : entityDefinitions) {
@@ -186,6 +190,8 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
if (!messages.isEmpty()) {
notificationSender.send(messages);
}
+
+ RequestContext.get().endMetricRecord(metric);
}
private List<String> getNotificationAttributes(String entityType) {
http://git-wip-us.apache.org/repos/asf/atlas/blob/beb34506/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index b344c50..003e5b0 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -88,6 +88,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class);
private static final Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
+ private static final Logger LARGE_MESSAGES_LOG = LoggerFactory.getLogger("LARGE_MESSAGES");
private static final int SC_OK = 200;
private static final int SC_BAD_REQUEST = 400;
@@ -121,6 +122,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final int maxWaitDuration;
private final boolean skipHiveColumnLineageHive20633;
private final int skipHiveColumnLineageHive20633InputsThreshold;
+ private final int largeMessageProcessingTimeThresholdMs;
private final boolean consumerDisabled;
private NotificationInterface notificationInterface;
@@ -153,6 +155,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
consumerDisabled = applicationProperties.getBoolean(CONSUMER_DISABLED, false);
+ largeMessageProcessingTimeThresholdMs = applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60 * 1000); // 60 sec by default
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
@@ -603,6 +606,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
} finally {
AtlasPerfTracer.log(perf);
+ long msgProcessingTime = perf != null ? perf.getElapsedTime() : 0;
+
+ if (msgProcessingTime > largeMessageProcessingTimeThresholdMs) {
+ String strMessage = AbstractNotification.getMessageJson(message);
+
+ LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", msgProcessingTime, strMessage.length(), kafkaMsg.getOffset());
+ LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", msgProcessingTime, strMessage.length(), kafkaMsg.getOffset(), strMessage);
+ }
+
if (auditLog != null) {
auditLog.setHttpStatus(isFailedMsg ? SC_BAD_REQUEST : SC_OK);
auditLog.setTimeTaken(System.currentTimeMillis() - startTime);