You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2016/12/23 01:46:41 UTC

[12/50] [abbrv] incubator-atlas git commit: ATLAS-1104 Get outgoing edges by label doesn't work in some cases (shwethags)

ATLAS-1104 Get outgoing edges by label doesn't work in some cases (shwethags)

(cherry picked from commit 127b378df5ff2712754abf32e86bdc279d7a29e2)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/0cfd957d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/0cfd957d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/0cfd957d

Branch: refs/heads/0.7-incubating
Commit: 0cfd957d705a20bbc410109a2d353f9a06a79315
Parents: 368a2f9
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Tue Aug 9 12:40:50 2016 +0530
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Thu Dec 22 15:11:11 2016 -0800

----------------------------------------------------------------------
 release-log.txt                                 |  1 +
 .../graph/GraphBackedDiscoveryService.java      | 19 +++++-
 .../atlas/repository/graph/GraphHelper.java     | 47 ++++++++++++--
 .../graph/GraphToTypedInstanceMapper.java       |  5 +-
 .../typestore/GraphBackedTypeStore.java         |  2 +-
 .../GraphBackedMetadataRepositoryTest.java      | 68 ++++++++++++++++++++
 .../atlas/repository/graph/GraphHelperTest.java | 35 ++++++++++
 7 files changed, 168 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0cfd957d/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index b18a8a5..874cd23 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -28,6 +28,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
 
 ALL CHANGES:
+ATLAS-1104 Get outgoing edges by label doesn't work in some cases (shwethags)
 ATLAS-1105 Disable HiveLiteralRewriterTest since its not used currently (sumasai)
 ATLAS-1103 : UI: Search type list is not refreshed (Kalyanikashikar via sumasai)
 ATLAS-1099 UI : multiple tag assign button hides wrongly (Kalyanikashikar via sumasai)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0cfd957d/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java
index 351bd12..ef426e9 100755
--- a/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java
@@ -18,10 +18,12 @@
 
 package org.apache.atlas.discovery.graph;
 
+import com.thinkaurelius.titan.core.TitanEdge;
 import com.thinkaurelius.titan.core.TitanGraph;
 import com.thinkaurelius.titan.core.TitanIndexQuery;
 import com.thinkaurelius.titan.core.TitanProperty;
 import com.thinkaurelius.titan.core.TitanVertex;
+import com.tinkerpop.blueprints.Direction;
 import com.tinkerpop.blueprints.Vertex;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.GraphTransaction;
@@ -58,6 +60,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Graph backed implementation of Search.
@@ -204,10 +207,12 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
                         oRow.put(k.toString(), v.toString());
                     }
                 } else if (r instanceof TitanVertex) {
-                    Iterable<TitanProperty> ps = ((TitanVertex) r).getProperties();
+                    TitanVertex vertex = (TitanVertex) r;
+                    oRow.put("id", vertex.getId().toString());
+                    Iterable<TitanProperty> ps = vertex.getProperties();
                     for (TitanProperty tP : ps) {
                         String pName = tP.getPropertyKey().getName();
-                        Object pValue = ((TitanVertex) r).getProperty(pName);
+                        Object pValue = vertex.getProperty(pName);
                         if (pValue != null) {
                             oRow.put(pName, pValue.toString());
                         }
@@ -215,6 +220,16 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
 
                 } else if (r instanceof String) {
                     oRow.put("", r.toString());
+                } else if (r instanceof TitanEdge) {
+                    TitanEdge edge = (TitanEdge) r;
+                    oRow.put("id", edge.getId().toString());
+                    oRow.put("label", edge.getLabel());
+                    oRow.put("inVertex", edge.getVertex(Direction.IN).getId().toString());
+                    oRow.put("outVertex", edge.getVertex(Direction.OUT).getId().toString());
+                    Set<String> propertyKeys = edge.getPropertyKeys();
+                    for (String propertyKey : propertyKeys) {
+                        oRow.put(propertyKey, edge.getProperty(propertyKey).toString());
+                    }
                 } else {
                     throw new DiscoveryException(String.format("Cannot process result %s", o.toString()));
                 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0cfd957d/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
index 81fb76e..6efb22a 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
@@ -131,8 +131,10 @@ public final class GraphHelper {
     }
 
     public Edge getOrCreateEdge(Vertex outVertex, Vertex inVertex, String edgeLabel) {
-        Iterable<Edge> edges = inVertex.getEdges(Direction.IN, edgeLabel);
-        for (Edge edge : edges) {
+        Iterator<Edge> edges = GraphHelper.getAdjacentEdgesByLabel(inVertex, Direction.IN, edgeLabel);
+
+        while (edges.hasNext()) {
+            Edge edge = edges.next();
             if (edge.getVertex(Direction.OUT).getId().toString().equals(outVertex.getId().toString())) {
                 Id.EntityState edgeState = getState(edge);
                 if (edgeState == null || edgeState == Id.EntityState.ACTIVE) {
@@ -193,14 +195,49 @@ public final class GraphHelper {
         return vertex;
     }
 
-    public static Iterator<Edge> getOutGoingEdgesByLabel(Vertex instanceVertex, String edgeLabel) {
+    //In some cases of parallel APIs, the edge is added, but get edge by label doesn't return the edge. ATLAS-1104
+    //So traversing all the edges
+    public static Iterator<Edge> getAdjacentEdgesByLabel(Vertex instanceVertex, Direction direction, final String edgeLabel) {
         LOG.debug("Finding edges for {} with label {}", string(instanceVertex), edgeLabel);
         if(instanceVertex != null && edgeLabel != null) {
-            return instanceVertex.getEdges(Direction.OUT, edgeLabel).iterator();
+            final Iterator<Edge> iterator = instanceVertex.getEdges(direction).iterator();
+            return new Iterator<Edge>() {
+                private Edge edge = null;
+
+                @Override
+                public boolean hasNext() {
+                    while (edge == null && iterator.hasNext()) {
+                        Edge localEdge = iterator.next();
+                        if (localEdge.getLabel().equals(edgeLabel)) {
+                            edge = localEdge;
+                        }
+                    }
+                    return edge != null;
+                }
+
+                @Override
+                public Edge next() {
+                    if (hasNext()) {
+                        Edge localEdge = edge;
+                        edge = null;
+                        return localEdge;
+                    }
+                    return null;
+                }
+
+                @Override
+                public void remove() {
+                    throw new IllegalStateException("Not handled");
+                }
+            };
         }
         return null;
     }
 
+    public static Iterator<Edge> getOutGoingEdgesByLabel(Vertex instanceVertex, String edgeLabel) {
+        return getAdjacentEdgesByLabel(instanceVertex, Direction.OUT, edgeLabel);
+    }
+
     /**
      * Returns the active edge for the given edge label.
      * If the vertex is deleted and there is no active edge, it returns the latest deleted edge
@@ -209,7 +246,7 @@ public final class GraphHelper {
      * @return
      */
     public static Edge getEdgeForLabel(Vertex vertex, String edgeLabel) {
-        Iterator<Edge> iterator = GraphHelper.getOutGoingEdgesByLabel(vertex, edgeLabel);
+        Iterator<Edge> iterator = GraphHelper.getAdjacentEdgesByLabel(vertex, Direction.OUT, edgeLabel);
         Edge latestDeletedEdge = null;
         long latestDeletedEdgeTime = Long.MIN_VALUE;
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0cfd957d/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java
index 69223f5..5fbe46b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java
@@ -44,6 +44,7 @@ import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -295,7 +296,9 @@ public final class GraphToTypedInstanceMapper {
         TraitType traitType, ITypedStruct traitInstance) throws AtlasException {
         String relationshipLabel = GraphHelper.getTraitLabel(typedInstanceTypeName, traitName);
         LOG.debug("Finding edge for {} -> label {} ", instanceVertex, relationshipLabel);
-        for (Edge edge : instanceVertex.getEdges(Direction.OUT, relationshipLabel)) {
+        Iterator<Edge> edgeIterator = GraphHelper.getOutGoingEdgesByLabel(instanceVertex, relationshipLabel);
+        while (edgeIterator.hasNext()) {
+            Edge edge = edgeIterator.next();
             final Vertex traitInstanceVertex = edge.getVertex(Direction.IN);
             if (traitInstanceVertex != null) {
                 LOG.debug("Found trait instance vertex {}, mapping to instance {} ", traitInstanceVertex,

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0cfd957d/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java b/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java
index 3a5829b..4530cac 100755
--- a/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java
@@ -282,7 +282,7 @@ public class GraphBackedTypeStore implements ITypeStore {
 
     private ImmutableSet<String> getSuperTypes(Vertex vertex) {
         Set<String> superTypes = new HashSet<>();
-        Iterator<Edge> edges = vertex.getEdges(Direction.OUT, SUPERTYPE_EDGE_LABEL).iterator();
+        Iterator<Edge> edges = GraphHelper.getOutGoingEdgesByLabel(vertex, SUPERTYPE_EDGE_LABEL);
         while (edges.hasNext()) {
             Edge edge = edges.next();
             superTypes.add((String) edge.getVertex(Direction.IN).getProperty(Constants.TYPENAME_PROPERTY_KEY));

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0cfd957d/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
index 9bd3682..1ebf3ae 100755
--- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
@@ -23,6 +23,8 @@ import com.google.common.collect.ImmutableSet;
 import com.thinkaurelius.titan.core.TitanGraph;
 import com.thinkaurelius.titan.core.util.TitanCleanup;
 import com.tinkerpop.blueprints.Compare;
+import com.tinkerpop.blueprints.Direction;
+import com.tinkerpop.blueprints.Edge;
 import com.tinkerpop.blueprints.GraphQuery;
 import com.tinkerpop.blueprints.Vertex;
 import org.apache.atlas.GraphTransaction;
@@ -66,11 +68,16 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import static org.apache.atlas.typesystem.types.utils.TypesUtil.createClassTypeDef;
 import static org.apache.atlas.typesystem.types.utils.TypesUtil.createUniqueRequiredAttrDef;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
 
 /**
  * GraphBackedMetadataRepository test
@@ -127,6 +134,67 @@ public class GraphBackedMetadataRepositoryTest {
     }
 
     @Test
+    //In some cases of parallel APIs, the edge is added, but get edge by label doesn't return the edge. ATLAS-1104
+    public void testConcurrentCalls() throws Exception {
+        Referenceable dbInstance = new Referenceable(TestUtils.DATABASE_TYPE);
+        dbInstance.set("name", randomString());
+        dbInstance.set("description", "foo database");
+        final String id1 = createEntity(dbInstance).get(0);
+
+        dbInstance.set("name", randomString());
+        final String id2 = createEntity(dbInstance).get(0);
+
+        TraitType piiType = typeSystem.getDataType(TraitType.class, TestUtils.PII);
+        final ITypedStruct trait = piiType.convert(new Struct(TestUtils.PII), Multiplicity.REQUIRED);
+
+        ExecutorService executor = Executors.newFixedThreadPool(3);
+        List<Future<Object>> futures = new ArrayList<>();
+        futures.add(executor.submit(new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+                repositoryService.addTrait(id1, trait);
+                return null;
+            }
+        }));
+        futures.add(executor.submit(new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+                repositoryService.addTrait(id2, trait);
+                return null;
+            }
+        }));
+        futures.add(executor.submit(new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+                return discoveryService.searchByDSL(TestUtils.TABLE_TYPE, new QueryParams(10, 0));
+            }
+        }));
+
+        for (Future future : futures) {
+            future.get();
+        }
+        executor.shutdown();
+
+        boolean validated1 = assertEdge(id1);
+        boolean validated2 = assertEdge(id2);
+        assertNotEquals(validated1, validated2);
+    }
+
+    private boolean assertEdge(String id) throws Exception {
+        TitanGraph graph = graphProvider.get();
+        Vertex vertex = graph.query().has(Constants.GUID_PROPERTY_KEY, id).vertices().iterator().next();
+        Iterable<Edge> edges =
+                vertex.getEdges(Direction.OUT, TestUtils.DATABASE_TYPE + "." + TestUtils.PII);
+        if(!edges.iterator().hasNext()) {
+            repositoryService.deleteTrait(id, TestUtils.PII);
+            List<String> traits = repositoryService.getTraitNames(id);
+            assertTrue(traits.isEmpty());
+            return true;
+        }
+        return false;
+    }
+
+    @Test
     public void testSubmitEntity() throws Exception {
         ITypedReferenceableInstance hrDept = TestUtils.createDeptEg1(typeSystem);
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0cfd957d/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java
index 8a0a6bc..428846f 100644
--- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java
@@ -18,12 +18,29 @@
 
 package org.apache.atlas.repository.graph;
 
+import com.thinkaurelius.titan.core.TitanGraph;
+import com.thinkaurelius.titan.core.TitanVertex;
+import com.tinkerpop.blueprints.Edge;
+import org.apache.atlas.RepositoryMetadataModule;
 import org.testng.annotations.DataProvider;
+import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
+import javax.inject.Inject;
+
+import java.util.Iterator;
+
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 
+@Guice(modules = RepositoryMetadataModule.class)
 public class GraphHelperTest {
+    @Inject
+    private GraphProvider<TitanGraph> graphProvider;
+
     @DataProvider(name = "encodeDecodeTestData")
     private Object[][] createTestData() {
         return new Object[][]{
@@ -48,4 +65,22 @@ public class GraphHelperTest {
         String decodedStr = GraphHelper.decodePropertyKey(encodedStr);
         assertEquals(decodedStr, str);
     }
+
+    @Test
+    public void testGetOutgoingEdgesByLabel() throws Exception {
+        TitanGraph graph = graphProvider.get();
+        TitanVertex v1 = graph.addVertex();
+        TitanVertex v2 = graph.addVertex();
+
+        v1.addEdge("l1", v2);
+        v1.addEdge("l2", v2);
+
+        Iterator<Edge> iterator = GraphHelper.getOutGoingEdgesByLabel(v1, "l1");
+        assertTrue(iterator.hasNext());
+        assertTrue(iterator.hasNext());
+        assertNotNull(iterator.next());
+        assertNull(iterator.next());
+        assertFalse(iterator.hasNext());
+        assertFalse(iterator.hasNext());
+    }
 }