You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2020/04/30 04:04:16 UTC

[atlas] branch master updated: ATLAS-3762: Improve Edge creator using Genuine iterator.

This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 25f3002  ATLAS-3762: Improve Edge creator using Genuine iterator.
25f3002 is described below

commit 25f3002e0e84927eb39cebb5708d77ef81755d79
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Wed Apr 29 16:32:29 2020 -0700

    ATLAS-3762: Improve Edge creator using Genuine iterator.
---
 .../atlas/repository/graphdb/AtlasVertex.java      | 10 +++++
 .../repository/graphdb/janus/AtlasJanusGraph.java  | 36 +++++++++++++++---
 .../repository/graphdb/janus/AtlasJanusVertex.java | 17 +++++++++
 .../apache/atlas/repository/graph/GraphHelper.java | 25 ++++++++++++-
 .../store/graph/v2/AtlasRelationshipStoreV2.java   | 43 ++++++++++++++++------
 5 files changed, 111 insertions(+), 20 deletions(-)

diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertex.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertex.java
index 9406e26..20e6177 100644
--- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertex.java
+++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertex.java
@@ -45,6 +45,16 @@ public interface AtlasVertex<V, E> extends AtlasElement {
      */
     Iterable<AtlasEdge<V, E>> getEdges(AtlasEdgeDirection direction, String[] edgeLabels);
 
+    long getEdgesCount(AtlasEdgeDirection direction, String edgeLabel);
+
+    /**
+     * Does vertex have edges specified by the direction and label
+     * @param dir
+     * @param edgeLabel
+     * @return
+     */
+    boolean hasEdges(AtlasEdgeDirection dir, String edgeLabel);
+
     /**
      * Gets the edges associated with this vertex going the
      * specified direction.
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
index eb02062..a30dbc7 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
@@ -18,6 +18,7 @@
 package org.apache.atlas.repository.graphdb.janus;
 
 import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.atlas.ApplicationProperties;
@@ -25,7 +26,19 @@ import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.groovy.GroovyExpression;
-import org.apache.atlas.repository.graphdb.*;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphIndexClient;
+import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
+import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
+import org.apache.atlas.repository.graphdb.AtlasGraphTraversal;
+import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
+import org.apache.atlas.repository.graphdb.AtlasIndexQueryParameter;
+import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
+import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.graphdb.GraphIndexQueryParameters;
+import org.apache.atlas.repository.graphdb.GremlinVersion;
 import org.apache.atlas.repository.graphdb.janus.query.AtlasJanusGraphQuery;
 import org.apache.atlas.repository.graphdb.utils.IteratorToIterableAdapter;
 import org.apache.atlas.type.AtlasType;
@@ -62,9 +75,12 @@ import javax.script.ScriptEngine;
 import javax.script.ScriptException;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.*;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_DEFAULT;
 import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_PROPERTY;
@@ -403,7 +419,11 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
     }
 
     public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> wrapVertices(Iterable<? extends Vertex> it) {
-        return StreamSupport.stream(it.spliterator(), false).map(input -> GraphDbObjectFactory.createVertex(AtlasJanusGraph.this, input)).collect(Collectors.toList());
+
+        return Iterables.transform(it,
+                (Function<Vertex, AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>>) input ->
+                    GraphDbObjectFactory.createVertex(AtlasJanusGraph.this, input));
+
     }
 
     public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> wrapEdges(Iterator<? extends Edge> it) {
@@ -413,7 +433,11 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
     }
 
     public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> wrapEdges(Iterable<? extends Edge> it) {
-        return StreamSupport.stream(it.spliterator(), false).map(input -> GraphDbObjectFactory.createEdge(AtlasJanusGraph.this, input)).collect(Collectors.toList());
+
+        return Iterables.transform(it,
+                (Function<Edge, AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>>) input ->
+                        GraphDbObjectFactory.createEdge(AtlasJanusGraph.this, input));
+
     }
 
     public void addMultiProperties(Set<String> names) {
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertex.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertex.java
index fdc9fd0..b6e2c26 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertex.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertex.java
@@ -20,12 +20,14 @@ package org.apache.atlas.repository.graphdb.janus;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.stream.StreamSupport;
 
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
 import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.graphdb.AtlasVertexQuery;
+import org.apache.atlas.repository.graphdb.utils.IteratorToIterableAdapter;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
@@ -78,6 +80,21 @@ public class AtlasJanusVertex extends AtlasJanusElement<Vertex> implements Atlas
         return graph.wrapEdges(edges);
     }
 
+    @Override
+    public long getEdgesCount(AtlasEdgeDirection dir, String edgeLabel) {
+        Direction      direction = AtlasJanusObjectFactory.createDirection(dir);
+        Iterator<Edge> it     = getWrappedElement().edges(direction, edgeLabel);
+        IteratorToIterableAdapter<Edge> iterable = new IteratorToIterableAdapter<>(it);
+        return StreamSupport.stream(iterable.spliterator(), true).count();
+    }
+
+    @Override
+    public boolean hasEdges(AtlasEdgeDirection dir, String edgeLabel) {
+        Direction      direction = AtlasJanusObjectFactory.createDirection(dir);
+        Iterator<Edge> edges     = getWrappedElement().edges(direction, edgeLabel);
+        return edges.hasNext();
+    }
+
     private JanusGraphVertex getAsJanusVertex() {
         return (JanusGraphVertex)getWrappedElement();
     }
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
index 2b8227a..b9e3a5e 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
@@ -35,7 +35,6 @@ import org.apache.atlas.repository.graphdb.AtlasVertexQuery;
 import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
 import org.apache.atlas.type.AtlasArrayType;
 import org.apache.atlas.type.AtlasMapType;
-import org.apache.atlas.util.AtlasGremlinQueryProvider;
 import org.apache.atlas.utils.AtlasPerfMetrics;
 import org.apache.atlas.v1.model.instance.Id;
 import org.apache.atlas.v1.model.instance.Referenceable;
@@ -72,7 +71,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.UUID;
 
 import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
 import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
@@ -269,6 +267,21 @@ public final class GraphHelper {
         return ret;
     }
 
+    public static long getAdjacentEdgesCountByLabel(AtlasVertex instanceVertex, AtlasEdgeDirection direction, final String edgeLabel) {
+        AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getAdjacentEdgesCountByLabel");
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Finding edges for {} with label {}", string(instanceVertex), edgeLabel);
+        }
+
+        long ret = 0;
+        if(instanceVertex != null && edgeLabel != null) {
+            ret = instanceVertex.getEdgesCount(direction, edgeLabel);
+        }
+
+        RequestContext.get().endMetricRecord(metric);
+        return ret;
+    }
+
     public static boolean isPropagationEnabled(AtlasVertex classificationVertex) {
         boolean ret = false;
 
@@ -437,6 +450,14 @@ public final class GraphHelper {
         return getAdjacentEdgesByLabel(instanceVertex, AtlasEdgeDirection.OUT, edgeLabel);
     }
 
+    public static long getOutGoingEdgesCountByLabel(AtlasVertex instanceVertex, String edgeLabel) {
+        return getAdjacentEdgesCountByLabel(instanceVertex, AtlasEdgeDirection.OUT, edgeLabel);
+    }
+
+    public static long getInComingEdgesCountByLabel(AtlasVertex instanceVertex, String edgeLabel) {
+        return getAdjacentEdgesCountByLabel(instanceVertex, AtlasEdgeDirection.IN, edgeLabel);
+    }
+
     public AtlasEdge getEdgeForLabel(AtlasVertex vertex, String edgeLabel, AtlasRelationshipEdgeDirection edgeDirection) {
         AtlasEdge ret;
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
index d1c1f12..ab431bc 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
@@ -19,6 +19,7 @@
 package org.apache.atlas.repository.store.graph.v2;
 
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.RequestContext;
 import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.authorize.AtlasAuthorizationUtils;
 import org.apache.atlas.authorize.AtlasPrivilege;
@@ -49,6 +50,7 @@ import org.apache.atlas.type.AtlasRelationshipType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.utils.AtlasPerfMetrics;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
@@ -67,24 +69,25 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.function.Function;
 
+import static org.apache.atlas.AtlasConfiguration.NOTIFICATION_RELATIONSHIPS_ENABLED;
 import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
 import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
-import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.*;
+import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.BOTH;
+import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.NONE;
+import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO;
+import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE;
 import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
 import static org.apache.atlas.repository.Constants.HOME_ID_KEY;
 import static org.apache.atlas.repository.Constants.PROVENANCE_TYPE_KEY;
 import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
 import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
 import static org.apache.atlas.repository.Constants.VERSION_PROPERTY_KEY;
-import static org.apache.atlas.AtlasConfiguration.NOTIFICATION_RELATIONSHIPS_ENABLED;
-
-
 import static org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds;
 import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid;
 import static org.apache.atlas.repository.graph.GraphHelper.getClassificationName;
 import static org.apache.atlas.repository.graph.GraphHelper.getPropagatableClassifications;
-import static org.apache.atlas.repository.graph.GraphHelper.getIncomingEdgesByLabel;
 import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
 import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getState;
 import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getTypeName;
@@ -776,23 +779,39 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
     }
 
     public AtlasEdge getRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, String relationshipLabel) {
-        AtlasEdge           ret           = null;
-        Iterator<AtlasEdge> edgesIterator = getIncomingEdgesByLabel(toVertex, relationshipLabel);
+        AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getRelationshipEdge");
+
+        AtlasEdge ret = null;
+
+        if (toVertex.hasEdges(AtlasEdgeDirection.IN, relationshipLabel) && fromVertex.hasEdges(AtlasEdgeDirection.OUT, relationshipLabel)) {
+            long fromVertexOutgoingEdgeCount = graphHelper.getOutGoingEdgesCountByLabel(fromVertex, relationshipLabel);
+            long toVertexIncomingEdgeCount = graphHelper.getInComingEdgesCountByLabel(toVertex, relationshipLabel);
+            if (toVertexIncomingEdgeCount < fromVertexOutgoingEdgeCount) {
+                Iterator<AtlasEdge> edgesIteratorIn = graphHelper.getIncomingEdgesByLabel(toVertex, relationshipLabel);
+                ret = getActiveEdgeFromList(edgesIteratorIn, fromVertex.getId(), e -> e.getOutVertex().getId());
+            } else {
+                Iterator<AtlasEdge> edgesIteratorOut = graphHelper.getOutGoingEdgesByLabel(fromVertex, relationshipLabel);
+                ret = getActiveEdgeFromList(edgesIteratorOut, toVertex.getId(), e -> e.getInVertex().getId());
+            }
+        }
 
+        RequestContext.get().endMetricRecord(metric);
+        return ret;
+    }
+
+    private AtlasEdge getActiveEdgeFromList(Iterator<AtlasEdge> edgesIterator, Object vertexIdToCompare, Function<AtlasEdge, Object> edgeIdFn) {
         while (edgesIterator != null && edgesIterator.hasNext()) {
             AtlasEdge edge = edgesIterator.next();
-
             if (edge != null) {
                 Status status = graphHelper.getStatus(edge);
 
-                if ((status == null || status == ACTIVE) && edge.getOutVertex().getId().equals(fromVertex.getId())) {
-                    ret = edge;
-                    break;
+                if ((status == null || status == ACTIVE) && edgeIdFn.apply(edge).equals(vertexIdToCompare)) {
+                    return edge;
                 }
             }
         }
 
-        return ret;
+        return null;
     }
 
     private Long getRelationshipVersion(AtlasRelationship relationship) {