You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by su...@apache.org on 2016/10/10 22:37:52 UTC
[4/8] incubator-atlas git commit: ATLAS-694 Update Atlas code to use
graph abstraction layer (jnhagelb via sumasai)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/main/java/org/apache/atlas/repository/graph/IAtlasGraphProvider.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/IAtlasGraphProvider.java b/repository/src/main/java/org/apache/atlas/repository/graph/IAtlasGraphProvider.java
new file mode 100755
index 0000000..a2cac2d
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/IAtlasGraphProvider.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.graph;
+
+import org.apache.atlas.repository.RepositoryException;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+
+/**
+ * Provides a mechanism to control what graph is used in various places. This
+ * allows the graph to be mocked out during unit testing and be initialized
+ * lazily.
+ */
+public interface IAtlasGraphProvider {
+
+ AtlasGraph get() throws RepositoryException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/main/java/org/apache/atlas/repository/graph/SoftDeleteHandler.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/SoftDeleteHandler.java b/repository/src/main/java/org/apache/atlas/repository/graph/SoftDeleteHandler.java
index 25aa7c5..92e43cb 100644
--- a/repository/src/main/java/org/apache/atlas/repository/graph/SoftDeleteHandler.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/SoftDeleteHandler.java
@@ -19,8 +19,8 @@
package org.apache.atlas.repository.graph;
import com.google.inject.Inject;
-import com.tinkerpop.blueprints.Edge;
-import com.tinkerpop.blueprints.Vertex;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.typesystem.persistence.Id;
@@ -36,7 +36,7 @@ public class SoftDeleteHandler extends DeleteHandler {
}
@Override
- protected void _deleteVertex(Vertex instanceVertex, boolean force) {
+ protected void _deleteVertex(AtlasVertex instanceVertex, boolean force) {
if (force) {
graphHelper.removeVertex(instanceVertex);
} else {
@@ -50,7 +50,7 @@ public class SoftDeleteHandler extends DeleteHandler {
}
@Override
- protected void deleteEdge(Edge edge, boolean force) throws AtlasException {
+ protected void deleteEdge(AtlasEdge edge, boolean force) throws AtlasException {
if (force) {
graphHelper.removeEdge(edge);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java b/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java
deleted file mode 100755
index 7a5e6a9..0000000
--- a/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.repository.graph;
-
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.repository.graphdb.titan0.Titan0Database;
-import org.apache.commons.configuration.Configuration;
-
-import com.thinkaurelius.titan.core.TitanGraph;
-
-/**
- * Temporary TitanGraphProvider to use until the graph database abstraction
- * layer is fully in place. Delegates to the Titan 0.5.4 implementation. This
- * will be removed once the abstraction layer is being used.
- */
-public class TitanGraphProvider implements GraphProvider<TitanGraph> {
-
- /* (non-Javadoc)
- * @see org.apache.atlas.repository.graph.GraphProvider#get()
- */
- @Override
- public TitanGraph get() {
- return Titan0Database.getGraphInstance();
- }
-
- public static TitanGraph getGraphInstance() {
- return Titan0Database.getGraphInstance();
- }
-
- public static Configuration getConfiguration() throws AtlasException {
- return Titan0Database.getConfiguration();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
index 2e0414e..47ae5e1 100644
--- a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
@@ -17,15 +17,26 @@
*/
package org.apache.atlas.repository.graph;
-import com.google.inject.Inject;
-import com.thinkaurelius.titan.core.SchemaViolationException;
-import com.tinkerpop.blueprints.Direction;
-import com.tinkerpop.blueprints.Edge;
-import com.tinkerpop.blueprints.Vertex;
+import static org.apache.atlas.repository.graph.GraphHelper.string;
+
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.RepositoryException;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.ITypedInstance;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
@@ -48,23 +59,12 @@ import org.apache.atlas.utils.MD5Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.security.MessageDigest;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.atlas.repository.graph.GraphHelper.string;
+import com.google.inject.Inject;
public final class TypedInstanceToGraphMapper {
private static final Logger LOG = LoggerFactory.getLogger(TypedInstanceToGraphMapper.class);
- private final Map<Id, Vertex> idToVertexMap = new HashMap<>();
+ private final Map<Id, AtlasVertex> idToVertexMap = new HashMap<>();
private final TypeSystem typeSystem = TypeSystem.getInstance();
private static final GraphHelper graphHelper = GraphHelper.getInstance();
@@ -143,7 +143,7 @@ public final class TypedInstanceToGraphMapper {
//new vertex, set all the properties
String guid = addOrUpdateAttributesAndTraits(operation, instance);
guids.add(guid);
- } catch (SchemaViolationException e) {
+ } catch (AtlasSchemaViolationException e) {
throw new EntityExistsException(instance, e);
}
}
@@ -159,7 +159,7 @@ public final class TypedInstanceToGraphMapper {
throw new RepositoryException("id cannot be null");
}
- Vertex instanceVertex = idToVertexMap.get(id);
+ AtlasVertex instanceVertex = idToVertexMap.get(id);
// add the attributes for the instance
ClassType classType = typeSystem.getDataType(ClassType.class, typedInstance.getTypeName());
@@ -174,7 +174,7 @@ public final class TypedInstanceToGraphMapper {
return getId(typedInstance)._getId();
}
- void mapInstanceToVertex(ITypedInstance typedInstance, Vertex instanceVertex,
+ void mapInstanceToVertex(ITypedInstance typedInstance, AtlasVertex instanceVertex,
Map<String, AttributeInfo> fields, boolean mapOnlyUniqueAttributes, Operation operation)
throws AtlasException {
@@ -189,7 +189,7 @@ public final class TypedInstanceToGraphMapper {
RequestContext.get().getRequestTime());
}
- void mapAttributeToVertex(ITypedInstance typedInstance, Vertex instanceVertex,
+ void mapAttributeToVertex(ITypedInstance typedInstance, AtlasVertex instanceVertex,
AttributeInfo attributeInfo, Operation operation) throws AtlasException {
Object attrValue = typedInstance.get(attributeInfo.name);
LOG.debug("Mapping attribute {} = {}", attributeInfo.name, attrValue);
@@ -213,11 +213,11 @@ public final class TypedInstanceToGraphMapper {
case CLASS:
String edgeLabel = graphHelper.getEdgeLabel(typedInstance, attributeInfo);
- Edge currentEdge = graphHelper.getEdgeForLabel(instanceVertex, edgeLabel);
- String newEdgeId = addOrUpdateReference(instanceVertex, attributeInfo, attributeInfo.dataType(),
+ AtlasEdge currentEdge = graphHelper.getEdgeForLabel(instanceVertex, edgeLabel);
+ AtlasEdge newEdge = addOrUpdateReference(instanceVertex, attributeInfo, attributeInfo.dataType(),
attrValue, currentEdge, edgeLabel, operation);
- if (currentEdge != null && !currentEdge.getId().toString().equals(newEdgeId)) {
+ if (currentEdge != null && !currentEdge.equals(newEdge)) {
deleteHandler.deleteEdgeReference(currentEdge, attributeInfo.dataType().getTypeCategory(),
attributeInfo.isComposite, true);
}
@@ -245,7 +245,7 @@ public final class TypedInstanceToGraphMapper {
Id id = instance.getId();
if (!idToVertexMap.containsKey(id)) {
- Vertex instanceVertex;
+ AtlasVertex instanceVertex;
if (id.isAssigned()) { // has a GUID
LOG.debug("Instance has an assigned id {}", instance.getId()._getId());
instanceVertex = graphHelper.getVertexForGUID(id.id);
@@ -291,13 +291,13 @@ public final class TypedInstanceToGraphMapper {
private void addFullTextProperty(List<ITypedReferenceableInstance> instances, FullTextMapper fulltextMapper) throws AtlasException {
for (ITypedReferenceableInstance typedInstance : instances) { // Traverse
- Vertex instanceVertex = getClassVertex(typedInstance);
+ AtlasVertex instanceVertex = getClassVertex(typedInstance);
String fullText = fulltextMapper.mapRecursive(instanceVertex, true);
GraphHelper.setProperty(instanceVertex, Constants.ENTITY_TEXT_PROPERTY_KEY, fullText);
}
}
- private void addTraits(ITypedReferenceableInstance typedInstance, Vertex instanceVertex, ClassType classType)
+ private void addTraits(ITypedReferenceableInstance typedInstance, AtlasVertex instanceVertex, ClassType classType)
throws AtlasException {
for (String traitName : typedInstance.getTraits()) {
LOG.debug("mapping trait {}", traitName);
@@ -311,7 +311,7 @@ public final class TypedInstanceToGraphMapper {
/******************************************** ARRAY **************************************************/
- private void mapArrayCollectionToVertex(ITypedInstance typedInstance, Vertex instanceVertex,
+ private void mapArrayCollectionToVertex(ITypedInstance typedInstance, AtlasVertex instanceVertex,
AttributeInfo attributeInfo, Operation operation) throws AtlasException {
LOG.debug("Mapping instance {} for array attribute {} vertex {}", typedInstance.toShortString(),
attributeInfo.name, string(instanceVertex));
@@ -323,16 +323,18 @@ public final class TypedInstanceToGraphMapper {
return;
}
- String propertyName = GraphHelper.getQualifiedFieldName(typedInstance, attributeInfo);
- List<String> currentElements = GraphHelper.getProperty(instanceVertex, propertyName);
IDataType elementType = ((DataTypes.ArrayType) attributeInfo.dataType()).getElemType();
+ String propertyName = GraphHelper.getQualifiedFieldName(typedInstance, attributeInfo);
+
+ List<Object> currentElements = GraphHelper.getArrayElementsProperty(elementType, instanceVertex, propertyName);
+
List<Object> newElementsCreated = new ArrayList<>();
if (!newAttributeEmpty) {
if (newElements != null && !newElements.isEmpty()) {
int index = 0;
for (; index < newElements.size(); index++) {
- String currentElement = (currentElements != null && index < currentElements.size()) ?
+ Object currentElement = (currentElements != null && index < currentElements.size()) ?
currentElements.get(index) : null;
LOG.debug("Adding/updating element at position {}, current element {}, new element {}", index,
currentElement, newElements.get(index));
@@ -343,18 +345,21 @@ public final class TypedInstanceToGraphMapper {
}
}
- List<String> additionalEdges = removeUnusedEntries(instanceVertex, propertyName, currentElements,
- newElementsCreated, elementType, attributeInfo);
- newElementsCreated.addAll(additionalEdges);
+ if(GraphHelper.isReference(elementType)) {
+
+ List<AtlasEdge> additionalEdges = removeUnusedEntries(instanceVertex, propertyName, (List)currentElements,
+ (List)newElementsCreated, elementType, attributeInfo);
+ newElementsCreated.addAll(additionalEdges);
+ }
// for dereference on way out
- GraphHelper.setProperty(instanceVertex, propertyName, newElementsCreated);
+ GraphHelper.setArrayElementsProperty(elementType, instanceVertex, propertyName, newElementsCreated);
}
//Removes unused edges from the old collection, compared to the new collection
- private List<String> removeUnusedEntries(Vertex instanceVertex, String edgeLabel,
- Collection<String> currentEntries,
- Collection<Object> newEntries,
+ private List<AtlasEdge> removeUnusedEntries(AtlasVertex instanceVertex, String edgeLabel,
+ Collection<AtlasEdge> currentEntries,
+ Collection<AtlasEdge> newEntries,
IDataType entryType, AttributeInfo attributeInfo) throws AtlasException {
if (currentEntries != null && !currentEntries.isEmpty()) {
LOG.debug("Removing unused entries from the old collection");
@@ -362,20 +367,17 @@ public final class TypedInstanceToGraphMapper {
|| entryType.getTypeCategory() == DataTypes.TypeCategory.CLASS) {
//Remove the edges for (current edges - new edges)
- List<String> cloneElements = new ArrayList<>(currentEntries);
+ List<AtlasEdge> cloneElements = new ArrayList<>(currentEntries);
cloneElements.removeAll(newEntries);
- List<String> additionalElements = new ArrayList<>();
+ List<AtlasEdge> additionalElements = new ArrayList<>();
LOG.debug("Removing unused entries from the old collection - {}", cloneElements);
if (!cloneElements.isEmpty()) {
- for (String edgeIdForDelete : cloneElements) {
- Edge edge = graphHelper.getEdgeByEdgeId(instanceVertex, edgeLabel, edgeIdForDelete);
- if(edge != null) {
- boolean deleted = deleteHandler.deleteEdgeReference(edge, entryType.getTypeCategory(),
- attributeInfo.isComposite, true);
- if (!deleted) {
- additionalElements.add(edgeIdForDelete);
- }
+ for (AtlasEdge edge : cloneElements) {
+ boolean deleted = deleteHandler.deleteEdgeReference(edge, entryType.getTypeCategory(),
+ attributeInfo.isComposite, true);
+ if (!deleted) {
+ additionalElements.add(edge);
}
}
}
@@ -387,7 +389,7 @@ public final class TypedInstanceToGraphMapper {
/******************************************** MAP **************************************************/
- private void mapMapCollectionToVertex(ITypedInstance typedInstance, Vertex instanceVertex,
+ private void mapMapCollectionToVertex(ITypedInstance typedInstance, AtlasVertex instanceVertex,
AttributeInfo attributeInfo, Operation operation) throws AtlasException {
LOG.debug("Mapping instance {} to vertex {} for attribute {}", typedInstance.toShortString(), string(instanceVertex),
attributeInfo.name);
@@ -402,20 +404,20 @@ public final class TypedInstanceToGraphMapper {
IDataType elementType = ((DataTypes.MapType) attributeInfo.dataType()).getValueType();
String propertyName = GraphHelper.getQualifiedFieldName(typedInstance, attributeInfo);
- Map<String, String> currentMap = new HashMap<>();
+ Map<String, Object> currentMap = new HashMap<>();
Map<String, Object> newMap = new HashMap<>();
- List<String> currentKeys = GraphHelper.getProperty(instanceVertex, propertyName);
+ List<String> currentKeys = GraphHelper.getListProperty(instanceVertex, propertyName);
if (currentKeys != null && !currentKeys.isEmpty()) {
for (String key : currentKeys) {
String propertyNameForKey = GraphHelper.getQualifiedNameForMapKey(propertyName, key);
- String propertyValueForKey = GraphHelper.getProperty(instanceVertex, propertyNameForKey).toString();
+ Object propertyValueForKey = GraphHelper.getMapValueProperty(elementType, instanceVertex, propertyNameForKey);
currentMap.put(key, propertyValueForKey);
}
}
if (!newAttributeEmpty) {
- for (Map.Entry entry : newAttribute.entrySet()) {
+ for (Map.Entry<Object,Object> entry : newAttribute.entrySet()) {
String keyStr = entry.getKey().toString();
String propertyNameForKey = GraphHelper.getQualifiedNameForMapKey(propertyName, keyStr);
@@ -423,53 +425,53 @@ public final class TypedInstanceToGraphMapper {
entry.getValue(), currentMap.get(keyStr), propertyNameForKey, operation);
//Add/Update/Remove property value
- GraphHelper.setProperty(instanceVertex, propertyNameForKey, newEntry);
+ GraphHelper.setMapValueProperty(elementType, instanceVertex, propertyNameForKey, newEntry);
newMap.put(keyStr, newEntry);
}
}
- Map<String, String> additionalMap =
+ Map<String, Object> additionalMap =
removeUnusedMapEntries(instanceVertex, propertyName, currentMap, newMap, elementType, attributeInfo);
Set<String> newKeys = new HashSet<>(newMap.keySet());
newKeys.addAll(additionalMap.keySet());
+
// for dereference on way out
- GraphHelper.setProperty(instanceVertex, propertyName, new ArrayList<>(newKeys));
+ GraphHelper.setListProperty(instanceVertex, propertyName, new ArrayList<>(newKeys));
}
//Remove unused entries from map
- private Map<String, String> removeUnusedMapEntries(Vertex instanceVertex, String propertyName,
- Map<String, String> currentMap,
- Map<String, Object> newMap, IDataType elementType,
- AttributeInfo attributeInfo)
- throws AtlasException {
- boolean reference = (elementType.getTypeCategory() == DataTypes.TypeCategory.STRUCT
- || elementType.getTypeCategory() == DataTypes.TypeCategory.CLASS);
- Map<String, String> additionalMap = new HashMap<>();
-
+ private Map<String, Object> removeUnusedMapEntries(
+ AtlasVertex instanceVertex, String propertyName,
+ Map<String, Object> currentMap,
+ Map<String, Object> newMap, IDataType elementType,
+ AttributeInfo attributeInfo)
+ throws AtlasException {
+
+ Map<String, Object> additionalMap = new HashMap<>();
for (String currentKey : currentMap.keySet()) {
+
boolean shouldDeleteKey = !newMap.containsKey(currentKey);
- if (reference) {
- String currentEdge = currentMap.get(currentKey);
+ if (GraphHelper.isReference(elementType)) {
+
//Delete the edge reference if its not part of new edges created/updated
+ AtlasEdge currentEdge = (AtlasEdge)currentMap.get(currentKey);
+
if (!newMap.values().contains(currentEdge)) {
- String edgeLabel = GraphHelper.getQualifiedNameForMapKey(propertyName, currentKey);
- Edge edge = graphHelper.getEdgeByEdgeId(instanceVertex, edgeLabel, currentMap.get(currentKey));
- if(edge != null) {
- boolean deleted =
- deleteHandler.deleteEdgeReference(edge, elementType.getTypeCategory(), attributeInfo.isComposite, true);
- if (!deleted) {
- additionalMap.put(currentKey, currentEdge);
- shouldDeleteKey = false;
- }
+
+ boolean deleted =
+ deleteHandler.deleteEdgeReference(currentEdge, elementType.getTypeCategory(), attributeInfo.isComposite, true);
+ if (!deleted) {
+ additionalMap.put(currentKey, currentEdge);
+ shouldDeleteKey = false;
}
}
}
if (shouldDeleteKey) {
String propertyNameForKey = GraphHelper.getQualifiedNameForMapKey(propertyName, currentKey);
- graphHelper.setProperty(instanceVertex, propertyNameForKey, null);
+ GraphHelper.setProperty(instanceVertex, propertyNameForKey, null);
}
}
return additionalMap;
@@ -477,8 +479,8 @@ public final class TypedInstanceToGraphMapper {
/******************************************** ARRAY & MAP **************************************************/
- private Object addOrUpdateCollectionEntry(Vertex instanceVertex, AttributeInfo attributeInfo,
- IDataType elementType, Object newAttributeValue, String currentValue,
+ private Object addOrUpdateCollectionEntry(AtlasVertex instanceVertex, AttributeInfo attributeInfo,
+ IDataType elementType, Object newAttributeValue, Object currentValue,
String propertyName, Operation operation)
throws AtlasException {
@@ -496,8 +498,7 @@ public final class TypedInstanceToGraphMapper {
case STRUCT:
case CLASS:
final String edgeLabel = GraphHelper.EDGE_LABEL_PREFIX + propertyName;
- Edge currentEdge = graphHelper.getEdgeByEdgeId(instanceVertex, edgeLabel, currentValue);
- return addOrUpdateReference(instanceVertex, attributeInfo, elementType, newAttributeValue, currentEdge,
+ return addOrUpdateReference(instanceVertex, attributeInfo, elementType, newAttributeValue, (AtlasEdge)currentValue,
edgeLabel, operation);
default:
@@ -505,8 +506,8 @@ public final class TypedInstanceToGraphMapper {
}
}
- private String addOrUpdateReference(Vertex instanceVertex, AttributeInfo attributeInfo,
- IDataType attributeType, Object newAttributeValue, Edge currentEdge,
+ private AtlasEdge addOrUpdateReference(AtlasVertex instanceVertex, AttributeInfo attributeInfo,
+ IDataType attributeType, Object newAttributeValue, AtlasEdge currentEdge,
String edgeLabel, Operation operation) throws AtlasException {
switch (attributeType.getTypeCategory()) {
case STRUCT:
@@ -523,26 +524,26 @@ public final class TypedInstanceToGraphMapper {
}
/******************************************** STRUCT **************************************************/
- private String addOrUpdateStruct(Vertex instanceVertex, AttributeInfo attributeInfo,
- ITypedStruct newAttributeValue, Edge currentEdge,
- String edgeLabel, Operation operation) throws AtlasException {
- String newEdgeId = null;
- if (currentEdge != null && newAttributeValue != null) {
+
+ private AtlasEdge addOrUpdateStruct(AtlasVertex instanceVertex, AttributeInfo attributeInfo,
+ ITypedStruct newAttributeValue, AtlasEdge currentEdge,
+ String edgeLabel, Operation operation) throws AtlasException {
+ AtlasEdge newEdge = null;
+ if (GraphHelper.elementExists(currentEdge) && newAttributeValue != null) {
//update
updateStructVertex(newAttributeValue, currentEdge, operation);
- newEdgeId = currentEdge.getId().toString();
- } else if (currentEdge == null && newAttributeValue != null) {
+ newEdge = currentEdge;
+ } else if (! GraphHelper.elementExists(currentEdge) && newAttributeValue != null) {
//add
- Edge newEdge = addStructVertex(newAttributeValue, instanceVertex, attributeInfo, edgeLabel);
- newEdgeId = newEdge.getId().toString();
+ newEdge = addStructVertex(newAttributeValue, instanceVertex, attributeInfo, edgeLabel);
}
- return newEdgeId;
+ return newEdge;
}
- private Edge addStructVertex(ITypedStruct structInstance, Vertex instanceVertex,
+ private AtlasEdge addStructVertex(ITypedStruct structInstance, AtlasVertex instanceVertex,
AttributeInfo attributeInfo, String edgeLabel) throws AtlasException {
// add a new vertex for the struct or trait instance
- Vertex structInstanceVertex = graphHelper.createVertexWithoutIdentity(structInstance.getTypeName(), null,
+ AtlasVertex structInstanceVertex = graphHelper.createVertexWithoutIdentity(structInstance.getTypeName(), null,
Collections.<String>emptySet()); // no super types for struct type
LOG.debug("created vertex {} for struct {} value {}", string(structInstanceVertex), attributeInfo.name,
structInstance.toShortString());
@@ -551,22 +552,22 @@ public final class TypedInstanceToGraphMapper {
mapInstanceToVertex(structInstance, structInstanceVertex, structInstance.fieldMapping().fields, false,
Operation.CREATE);
// add an edge to the newly created vertex from the parent
- Edge newEdge = graphHelper.getOrCreateEdge(instanceVertex, structInstanceVertex, edgeLabel);
+ AtlasEdge newEdge = graphHelper.getOrCreateEdge(instanceVertex, structInstanceVertex, edgeLabel);
return newEdge;
}
- private void updateStructVertex(ITypedStruct newAttributeValue, Edge currentEdge,
- Operation operation) throws AtlasException {
+ private void updateStructVertex(ITypedStruct newAttributeValue, AtlasEdge currentEdge,
+ Operation operation) throws AtlasException {
//Already existing vertex. Update
- Vertex structInstanceVertex = currentEdge.getVertex(Direction.IN);
+ AtlasVertex structInstanceVertex = currentEdge.getInVertex();
LOG.debug("Updating struct vertex {} with struct {}", string(structInstanceVertex), newAttributeValue.toShortString());
// Update attributes
final MessageDigest digester = MD5Utils.getDigester();
String newSignature = newAttributeValue.getSignatureHash(digester);
- String curSignature = GraphHelper.getProperty(structInstanceVertex, SIGNATURE_HASH_PROPERTY_KEY);
+ String curSignature = GraphHelper.getSingleValuedProperty(structInstanceVertex, SIGNATURE_HASH_PROPERTY_KEY, String.class);
if (!newSignature.equals(curSignature)) {
//Update struct vertex instance only if there is a change
@@ -578,33 +579,34 @@ public final class TypedInstanceToGraphMapper {
/******************************************** CLASS **************************************************/
- private String addOrUpdateClassVertex(Vertex instanceVertex, Edge currentEdge,
- ITypedReferenceableInstance newAttributeValue, AttributeInfo attributeInfo,
- String edgeLabel) throws AtlasException {
- Vertex newReferenceVertex = getClassVertex(newAttributeValue);
- if(newReferenceVertex == null && newAttributeValue != null) {
+ private AtlasEdge addOrUpdateClassVertex(AtlasVertex instanceVertex, AtlasEdge currentEdge,
+ ITypedReferenceableInstance newAttributeValue, AttributeInfo attributeInfo,
+ String edgeLabel) throws AtlasException {
+ AtlasVertex newReferenceVertex = getClassVertex(newAttributeValue);
+ if( ! GraphHelper.elementExists(newReferenceVertex) && newAttributeValue != null) {
LOG.error("Could not find vertex for Class Reference " + newAttributeValue);
throw new EntityNotFoundException("Could not find vertex for Class Reference " + newAttributeValue);
}
- String newEdgeId = null;
- if (currentEdge != null && newAttributeValue != null) {
- newEdgeId = updateClassEdge(instanceVertex, currentEdge, newAttributeValue, newReferenceVertex,
+ AtlasEdge newEdge = null;
+ if (GraphHelper.elementExists(currentEdge) && newAttributeValue != null) {
+ newEdge = updateClassEdge(instanceVertex, currentEdge, newAttributeValue, newReferenceVertex,
attributeInfo, edgeLabel);
- } else if (currentEdge == null && newAttributeValue != null){
- Edge newEdge = addClassEdge(instanceVertex, newReferenceVertex, edgeLabel);
- newEdgeId = newEdge.getId().toString();
+ } else if (! GraphHelper.elementExists(currentEdge) && newAttributeValue != null){
+ newEdge = addClassEdge(instanceVertex, newReferenceVertex, edgeLabel);
+
}
- return newEdgeId;
+ return newEdge;
}
- private Edge addClassEdge(Vertex instanceVertex, Vertex toVertex, String edgeLabel) throws AtlasException {
+
+ private AtlasEdge addClassEdge(AtlasVertex instanceVertex, AtlasVertex toVertex, String edgeLabel) throws AtlasException {
// add an edge to the class vertex from the instance
return graphHelper.getOrCreateEdge(instanceVertex, toVertex, edgeLabel);
}
- private Vertex getClassVertex(ITypedReferenceableInstance typedReference) throws EntityNotFoundException {
- Vertex referenceVertex = null;
+ private AtlasVertex getClassVertex(ITypedReferenceableInstance typedReference) throws EntityNotFoundException {
+ AtlasVertex referenceVertex = null;
Id id = null;
if (typedReference != null) {
id = typedReference instanceof Id ? (Id) typedReference : typedReference.getId();
@@ -625,7 +627,7 @@ public final class TypedInstanceToGraphMapper {
Id id = typedReference instanceof Id ? (Id) typedReference : typedReference.getId();
if (id.isUnassigned()) {
- Vertex classVertex = idToVertexMap.get(id);
+ AtlasVertex classVertex = idToVertexMap.get(id);
String guid = GraphHelper.getIdFromVertex(classVertex);
id = new Id(guid, 0, typedReference.getTypeName());
}
@@ -633,48 +635,48 @@ public final class TypedInstanceToGraphMapper {
}
- private String updateClassEdge(Vertex instanceVertex, Edge currentEdge,
- ITypedReferenceableInstance newAttributeValue,
- Vertex newVertex, AttributeInfo attributeInfo,
- String edgeLabel) throws AtlasException {
+ private AtlasEdge updateClassEdge(AtlasVertex instanceVertex, AtlasEdge currentEdge,
+ ITypedReferenceableInstance newAttributeValue,
+ AtlasVertex newVertex, AttributeInfo attributeInfo,
+ String edgeLabel) throws AtlasException {
LOG.debug("Updating {} for reference attribute {}", string(currentEdge), attributeInfo.name);
// Update edge if it exists
- Vertex currentVertex = currentEdge.getVertex(Direction.IN);
+ AtlasVertex currentVertex = currentEdge.getInVertex();
String currentEntityId = GraphHelper.getIdFromVertex(currentVertex);
String newEntityId = getId(newAttributeValue).id;
- String newEdgeId = currentEdge.getId().toString();
+ AtlasEdge newEdge = currentEdge;
if (!currentEntityId.equals(newEntityId)) {
// add an edge to the class vertex from the instance
if (newVertex != null) {
- Edge newEdge = graphHelper.getOrCreateEdge(instanceVertex, newVertex, edgeLabel);
- newEdgeId = newEdge.getId().toString();
+ newEdge = graphHelper.getOrCreateEdge(instanceVertex, newVertex, edgeLabel);
+
}
}
- return newEdgeId;
+ return newEdge;
}
/******************************************** TRAITS ****************************************************/
- void mapTraitInstanceToVertex(ITypedStruct traitInstance, IDataType entityType, Vertex parentInstanceVertex)
+ void mapTraitInstanceToVertex(ITypedStruct traitInstance, IDataType entityType, AtlasVertex parentInstanceVertex)
throws AtlasException {
- // add a new vertex for the struct or trait instance
+ // add a new AtlasVertex for the struct or trait instance
final String traitName = traitInstance.getTypeName();
- Vertex traitInstanceVertex = graphHelper.createVertexWithoutIdentity(traitInstance.getTypeName(), null,
+ AtlasVertex traitInstanceVertex = graphHelper.createVertexWithoutIdentity(traitInstance.getTypeName(), null,
typeSystem.getDataType(TraitType.class, traitName).getAllSuperTypeNames());
LOG.debug("created vertex {} for trait {}", string(traitInstanceVertex), traitName);
- // map all the attributes to this newly created vertex
+ // map all the attributes to this newly created AtlasVertex
mapInstanceToVertex(traitInstance, traitInstanceVertex, traitInstance.fieldMapping().fields, false, Operation.CREATE);
- // add an edge to the newly created vertex from the parent
+ // add an edge to the newly created AtlasVertex from the parent
String relationshipLabel = GraphHelper.getTraitLabel(entityType.getName(), traitName);
graphHelper.getOrCreateEdge(parentInstanceVertex, traitInstanceVertex, relationshipLabel);
}
/******************************************** PRIMITIVES **************************************************/
- private void mapPrimitiveOrEnumToVertex(ITypedInstance typedInstance, Vertex instanceVertex,
+ private void mapPrimitiveOrEnumToVertex(ITypedInstance typedInstance, AtlasVertex instanceVertex,
AttributeInfo attributeInfo) throws AtlasException {
Object attrValue = typedInstance.get(attributeInfo.name);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java b/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java
index b7e706f..ee63061 100755
--- a/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java
@@ -18,25 +18,29 @@
package org.apache.atlas.repository.typestore;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.thinkaurelius.titan.core.TitanGraph;
-import com.tinkerpop.blueprints.Direction;
-import com.tinkerpop.blueprints.Edge;
-import com.tinkerpop.blueprints.Vertex;
-import org.apache.atlas.AtlasConstants;
+import static org.apache.atlas.repository.graph.GraphHelper.setProperty;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
import org.apache.atlas.AtlasException;
import org.apache.atlas.GraphTransaction;
import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graph.GraphHelper;
-import org.apache.atlas.repository.graph.GraphProvider;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.types.AttributeDefinition;
import org.apache.atlas.typesystem.types.AttributeInfo;
import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.DataTypes;
+import org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
import org.apache.atlas.typesystem.types.EnumType;
import org.apache.atlas.typesystem.types.EnumTypeDefinition;
import org.apache.atlas.typesystem.types.EnumValue;
@@ -53,13 +57,10 @@ import org.codehaus.jettison.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import static org.apache.atlas.repository.graph.GraphHelper.setProperty;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
@Singleton
public class GraphBackedTypeStore implements ITypeStore {
@@ -69,13 +70,13 @@ public class GraphBackedTypeStore implements ITypeStore {
private static Logger LOG = LoggerFactory.getLogger(GraphBackedTypeStore.class);
- private final TitanGraph titanGraph;
+ private final AtlasGraph graph;
private GraphHelper graphHelper = GraphHelper.getInstance();
@Inject
- public GraphBackedTypeStore(GraphProvider<TitanGraph> graphProvider) {
- titanGraph = graphProvider.get();
+ public GraphBackedTypeStore() {
+ graph = AtlasGraphProvider.getGraphInstance();
}
@Override
@@ -91,14 +92,14 @@ public class GraphBackedTypeStore implements ITypeStore {
case STRUCT:
StructType structType = (StructType) dataType;
- storeInGraph(typeSystem, dataType.getTypeCategory(), dataType.getName(), dataType.getDescription(), dataType.getVersion(),
+ storeInGraph(typeSystem, dataType.getTypeCategory(), dataType.getName(), dataType.getDescription(),
ImmutableList.copyOf(structType.infoToNameMap.keySet()), ImmutableSet.<String>of());
break;
case TRAIT:
case CLASS:
HierarchicalType type = (HierarchicalType) dataType;
- storeInGraph(typeSystem, dataType.getTypeCategory(), dataType.getName(), type.getDescription(), type.getVersion(), type.immediateAttrs,
+ storeInGraph(typeSystem, dataType.getTypeCategory(), dataType.getName(), type.getDescription(), type.immediateAttrs,
type.superTypes);
break;
@@ -109,14 +110,14 @@ public class GraphBackedTypeStore implements ITypeStore {
}
private void storeInGraph(EnumType dataType) {
- Vertex vertex = createVertex(dataType.getTypeCategory(), dataType.getName(), dataType.getDescription(), dataType.getVersion());
+ AtlasVertex AtlasVertex = createVertex(dataType.getTypeCategory(), dataType.getName(), dataType.getDescription());
List<String> values = new ArrayList<>(dataType.values().size());
for (EnumValue enumValue : dataType.values()) {
String key = getPropertyKey(dataType.getName(), enumValue.value);
- setProperty(vertex, key, enumValue.ordinal);
+ setProperty(AtlasVertex, key, enumValue.ordinal);
values.add(enumValue.value);
}
- setProperty(vertex, getPropertyKey(dataType.getName()), values);
+ setProperty(AtlasVertex, getPropertyKey(dataType.getName()), values);
}
private String getPropertyKey(String name) {
@@ -131,9 +132,9 @@ public class GraphBackedTypeStore implements ITypeStore {
return PROPERTY_PREFIX + "edge." + parent + "." + child;
}
- private void storeInGraph(TypeSystem typeSystem, DataTypes.TypeCategory category, String typeName, String typeDescription, String typeVersion,
+ private void storeInGraph(TypeSystem typeSystem, DataTypes.TypeCategory category, String typeName, String typeDescription,
ImmutableList<AttributeInfo> attributes, ImmutableSet<String> superTypes) throws AtlasException {
- Vertex vertex = createVertex(category, typeName, typeDescription, typeVersion);
+ AtlasVertex vertex = createVertex(category, typeName, typeDescription);
List<String> attrNames = new ArrayList<>();
if (attributes != null) {
for (AttributeInfo attribute : attributes) {
@@ -153,18 +154,18 @@ public class GraphBackedTypeStore implements ITypeStore {
if (superTypes != null) {
for (String superTypeName : superTypes) {
HierarchicalType superType = typeSystem.getDataType(HierarchicalType.class, superTypeName);
- Vertex superVertex = createVertex(superType.getTypeCategory(), superTypeName, superType.getDescription(), AtlasConstants.DEFAULT_TYPE_VERSION);
+ AtlasVertex superVertex = createVertex(superType.getTypeCategory(), superTypeName, superType.getDescription());
graphHelper.getOrCreateEdge(vertex, superVertex, SUPERTYPE_EDGE_LABEL);
}
}
}
- private void addReferencesForAttribute(TypeSystem typeSystem, Vertex vertex, AttributeInfo attribute)
+ private void addReferencesForAttribute(TypeSystem typeSystem, AtlasVertex vertex, AttributeInfo attribute)
throws AtlasException {
ImmutableList<String> coreTypes = typeSystem.getCoreTypes();
List<IDataType> attrDataTypes = new ArrayList<>();
IDataType attrDataType = attribute.dataType();
- String vertexTypeName = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY);
+ String vertexTypeName = GraphHelper.getSingleValuedProperty(vertex, Constants.TYPENAME_PROPERTY_KEY, String.class);
switch (attrDataType.getTypeCategory()) {
case ARRAY:
@@ -201,7 +202,7 @@ public class GraphBackedTypeStore implements ITypeStore {
for (IDataType attrType : attrDataTypes) {
if (!coreTypes.contains(attrType.getName())) {
- Vertex attrVertex = createVertex(attrType.getTypeCategory(), attrType.getName(), attrType.getDescription(), attrType.getVersion());
+ AtlasVertex attrVertex = createVertex(attrType.getTypeCategory(), attrType.getName(), attrType.getDescription());
String label = getEdgeLabel(vertexTypeName, attribute.name);
graphHelper.getOrCreateEdge(vertex, attrVertex, label);
}
@@ -213,7 +214,7 @@ public class GraphBackedTypeStore implements ITypeStore {
public TypesDef restore() throws AtlasException {
//Get all vertices for type system
Iterator vertices =
- titanGraph.query().has(Constants.VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE).vertices().iterator();
+ graph.query().has(Constants.VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE).vertices().iterator();
return getTypesFromVertices(vertices);
}
@@ -221,25 +222,24 @@ public class GraphBackedTypeStore implements ITypeStore {
@Override
@GraphTransaction
public TypesDef restoreType(String typeName) throws AtlasException {
- // Get vertex for the specified type name.
+ // Get AtlasVertex for the specified type name.
Iterator vertices =
- titanGraph.query().has(Constants.VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE).has(Constants.TYPENAME_PROPERTY_KEY, typeName).vertices().iterator();
+ graph.query().has(Constants.VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE).has(Constants.TYPENAME_PROPERTY_KEY, typeName).vertices().iterator();
return getTypesFromVertices(vertices);
}
- private TypesDef getTypesFromVertices(Iterator vertices) throws AtlasException {
+ private TypesDef getTypesFromVertices(Iterator<AtlasVertex> vertices) throws AtlasException {
ImmutableList.Builder<EnumTypeDefinition> enums = ImmutableList.builder();
ImmutableList.Builder<StructTypeDefinition> structs = ImmutableList.builder();
ImmutableList.Builder<HierarchicalTypeDefinition<ClassType>> classTypes = ImmutableList.builder();
ImmutableList.Builder<HierarchicalTypeDefinition<TraitType>> traits = ImmutableList.builder();
while (vertices.hasNext()) {
- Vertex vertex = (Vertex) vertices.next();
- DataTypes.TypeCategory typeCategory = vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY);
- String typeName = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY);
- String typeDescription = vertex.getProperty(Constants.TYPEDESCRIPTION_PROPERTY_KEY);
- String typeVersion = vertex.getProperty(Constants.TYPEVERSION_PROPERTY_KEY);
+ AtlasVertex vertex = vertices.next();
+ DataTypes.TypeCategory typeCategory = GraphHelper.getSingleValuedProperty(vertex, Constants.TYPE_CATEGORY_PROPERTY_KEY, TypeCategory.class);
+ String typeName = GraphHelper.getSingleValuedProperty(vertex, Constants.TYPENAME_PROPERTY_KEY, String.class);
+ String typeDescription = GraphHelper.getSingleValuedProperty(vertex, Constants.TYPEDESCRIPTION_PROPERTY_KEY, String.class);
LOG.info("Restoring type {}.{}.{}", typeCategory, typeName, typeDescription);
switch (typeCategory) {
case ENUM:
@@ -248,19 +248,19 @@ public class GraphBackedTypeStore implements ITypeStore {
case STRUCT:
AttributeDefinition[] attributes = getAttributes(vertex, typeName);
- structs.add(new StructTypeDefinition(typeName, typeDescription, typeVersion, attributes));
+ structs.add(new StructTypeDefinition(typeName, typeDescription, attributes));
break;
case CLASS:
ImmutableSet<String> superTypes = getSuperTypes(vertex);
attributes = getAttributes(vertex, typeName);
- classTypes.add(new HierarchicalTypeDefinition(ClassType.class, typeName, typeDescription, typeVersion, superTypes, attributes));
+ classTypes.add(new HierarchicalTypeDefinition(ClassType.class, typeName, typeDescription, superTypes, attributes));
break;
case TRAIT:
superTypes = getSuperTypes(vertex);
attributes = getAttributes(vertex, typeName);
- traits.add(new HierarchicalTypeDefinition(TraitType.class, typeName, typeDescription, typeVersion, superTypes, attributes));
+ traits.add(new HierarchicalTypeDefinition(TraitType.class, typeName, typeDescription, superTypes, attributes));
break;
default:
@@ -270,37 +270,40 @@ public class GraphBackedTypeStore implements ITypeStore {
return TypesUtil.getTypesDef(enums.build(), structs.build(), traits.build(), classTypes.build());
}
- private EnumTypeDefinition getEnumType(Vertex vertex) {
- String typeName = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY);
- String typeDescription = vertex.getProperty(Constants.TYPEDESCRIPTION_PROPERTY_KEY);
- String typeVersion = vertex.getProperty(Constants.TYPEVERSION_PROPERTY_KEY);
+ private EnumTypeDefinition getEnumType(AtlasVertex vertex) throws AtlasException {
+ String typeName = GraphHelper.getSingleValuedProperty(vertex, Constants.TYPENAME_PROPERTY_KEY, String.class);
+ String typeDescription = GraphHelper.getSingleValuedProperty(vertex, Constants.TYPEDESCRIPTION_PROPERTY_KEY, String.class);
List<EnumValue> enumValues = new ArrayList<>();
- List<String> values = graphHelper.getProperty(vertex, getPropertyKey(typeName));
+ List<String> values = vertex.getListProperty(getPropertyKey(typeName));
for (String value : values) {
String valueProperty = getPropertyKey(typeName, value);
- enumValues.add(new EnumValue(value, (Integer) graphHelper.getProperty(vertex, valueProperty)));
+ enumValues.add(new EnumValue(value, GraphHelper.getSingleValuedProperty(vertex, valueProperty, Integer.class)));
}
- return new EnumTypeDefinition(typeName, typeDescription, typeVersion, enumValues.toArray(new EnumValue[enumValues.size()]));
+ return new EnumTypeDefinition(typeName, typeDescription, enumValues.toArray(new EnumValue[enumValues.size()]));
}
- private ImmutableSet<String> getSuperTypes(Vertex vertex) {
+ private ImmutableSet<String> getSuperTypes(AtlasVertex vertex) {
Set<String> superTypes = new HashSet<>();
- Iterator<Edge> edges = graphHelper.getOutGoingEdgesByLabel(vertex, SUPERTYPE_EDGE_LABEL);
+ Iterator<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT, SUPERTYPE_EDGE_LABEL).iterator();
while (edges.hasNext()) {
- Edge edge = edges.next();
- superTypes.add((String) edge.getVertex(Direction.IN).getProperty(Constants.TYPENAME_PROPERTY_KEY));
+ AtlasEdge edge = edges.next();
+ superTypes.add(edge.getInVertex().getProperty(Constants.TYPENAME_PROPERTY_KEY, String.class));
}
return ImmutableSet.copyOf(superTypes);
}
- private AttributeDefinition[] getAttributes(Vertex vertex, String typeName) throws AtlasException {
+ private AttributeDefinition[] getAttributes(AtlasVertex vertex, String typeName) throws AtlasException {
List<AttributeDefinition> attributes = new ArrayList<>();
- List<String> attrNames = graphHelper.getProperty(vertex, getPropertyKey(typeName));
+ List<String> attrNames = vertex.getListProperty(getPropertyKey(typeName));
if (attrNames != null) {
for (String attrName : attrNames) {
try {
String propertyKey = getPropertyKey(typeName, attrName);
- attributes.add(AttributeInfo.fromJson((String) graphHelper.getProperty(vertex, propertyKey)));
+ AttributeDefinition attrValue = AttributeInfo.fromJson((String) vertex.getJsonProperty(propertyKey));
+ if (attrValue != null)
+ {
+ attributes.add(attrValue);
+ }
} catch (JSONException e) {
throw new AtlasException(e);
}
@@ -315,24 +318,24 @@ public class GraphBackedTypeStore implements ITypeStore {
* @param typeName
* @return vertex
*/
- Vertex findVertex(DataTypes.TypeCategory category, String typeName) {
- LOG.debug("Finding vertex for {}.{}", category, typeName);
+ AtlasVertex findVertex(DataTypes.TypeCategory category, String typeName) {
+ LOG.debug("Finding AtlasVertex for {}.{}", category, typeName);
- Iterator results = titanGraph.query().has(Constants.TYPENAME_PROPERTY_KEY, typeName).vertices().iterator();
- Vertex vertex = null;
+ Iterator results = graph.query().has(Constants.TYPENAME_PROPERTY_KEY, typeName).vertices().iterator();
+ AtlasVertex vertex = null;
if (results != null && results.hasNext()) {
- //There should be just one vertex with the given typeName
- vertex = (Vertex) results.next();
+ //There should be just one AtlasVertex with the given typeName
+ vertex = (AtlasVertex) results.next();
}
return vertex;
}
- private Vertex createVertex(DataTypes.TypeCategory category, String typeName, String typeDescription, String typeVersion) {
- Vertex vertex = findVertex(category, typeName);
+ private AtlasVertex createVertex(DataTypes.TypeCategory category, String typeName, String typeDescription) {
+ AtlasVertex vertex = findVertex(category, typeName);
if (vertex == null) {
LOG.debug("Adding vertex {}{}", PROPERTY_PREFIX, typeName);
- vertex = titanGraph.addVertex(null);
- setProperty(vertex, Constants.VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE); // Mark as type vertex
+ vertex = graph.addVertex();
+ setProperty(vertex, Constants.VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE); // Mark as type AtlasVertex
setProperty(vertex, Constants.TYPE_CATEGORY_PROPERTY_KEY, category);
setProperty(vertex, Constants.TYPENAME_PROPERTY_KEY, typeName);
}
@@ -344,16 +347,6 @@ public class GraphBackedTypeStore implements ITypeStore {
} else {
LOG.debug(" type description is null ");
}
-
- if (typeVersion != null) {
- String oldVersion = getPropertyKey(Constants.TYPEVERSION_PROPERTY_KEY);
- if (!typeVersion.equals(oldVersion)) {
- setProperty(vertex, Constants.TYPEVERSION_PROPERTY_KEY, typeVersion);
- LOG.info(" updating type {} to version {}", typeName, typeVersion);
- }
- } else {
- LOG.info(" type version is null ");
- }
return vertex;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
index 026e98c..5b4eb0e 100755
--- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
+++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
@@ -18,10 +18,18 @@
package org.apache.atlas.services;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.inject.Provider;
+import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_INPUTS;
+import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
@@ -36,6 +44,7 @@ import org.apache.atlas.query.QueryParser;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.typestore.ITypeStore;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
@@ -60,29 +69,22 @@ import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeSystem;
-import org.apache.atlas.typesystem.types.ValueConversionException;
import org.apache.atlas.typesystem.types.cache.TypeCache;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
-import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.collection.Set;
-import javax.inject.Inject;
-import javax.inject.Singleton;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Provider;
+
-import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_INPUTS;
-import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS;
/**
* Simple wrapper over TypeSystem and MetadataRepository services with hooks
@@ -118,8 +120,9 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
this(repository, typeStore, typesRegistrar, typeListenerProviders, entityListenerProviders,
TypeSystem.getInstance(), ApplicationProperties.get(), typeCache);
}
-
- DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
+
+ //for testing only
+ public DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
final IBootstrapTypesRegistrar typesRegistrar,
final Collection<Provider<TypesChangeListener>> typeListenerProviders,
final Collection<Provider<EntityChangeListener>> entityListenerProviders,
@@ -153,8 +156,6 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
restoreTypeSystem();
}
- AtlasPatchHandler.handlePatches(this, typeSystem);
-
maxAuditResults = configuration.getShort(CONFIG_MAX_AUDIT_RESULTS, DEFAULT_MAX_AUDIT_RESULTS);
}
@@ -243,6 +244,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
typeDefinition = ParamChecker.notEmpty(typeDefinition, "type definition");
TypesDef typesDef = validateTypeDefinition(typeDefinition);
+
try {
final TypeSystem.TransientTypeSystem transientTypeSystem = typeSystem.createTransientTypeSystem(typesDef, isUpdate);
final Map<String, IDataType> typesAdded = transientTypeSystem.getTypesAdded();
@@ -334,40 +336,13 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
return guids;
}
- private ITypedReferenceableInstance[] deserializeClassInstances(String entityInstanceDefinition)
- throws AtlasException {
- try {
- JSONArray referableInstances = new JSONArray(entityInstanceDefinition);
- ITypedReferenceableInstance[] instances = new ITypedReferenceableInstance[referableInstances.length()];
- for (int index = 0; index < referableInstances.length(); index++) {
- Referenceable entityInstance =
- InstanceSerialization.fromJsonReferenceable(referableInstances.getString(index), true);
- ITypedReferenceableInstance typedInstrance = getTypedReferenceableInstance(entityInstance);
- instances[index] = typedInstrance;
- }
- return instances;
- } catch(ValueConversionException | TypeNotFoundException e) {
- throw e;
- } catch (Exception e) { // exception from deserializer
- LOG.error("Unable to deserialize json={}", entityInstanceDefinition, e);
- throw new IllegalArgumentException("Unable to deserialize json", e);
- }
+ private ITypedReferenceableInstance[] deserializeClassInstances(String entityInstanceDefinition) throws AtlasException {
+ return GraphHelper.deserializeClassInstances(typeSystem, entityInstanceDefinition);
}
-
+
@Override
public ITypedReferenceableInstance getTypedReferenceableInstance(Referenceable entityInstance) throws AtlasException {
- final String entityTypeName = ParamChecker.notEmpty(entityInstance.getTypeName(), "Entity type cannot be null");
-
- ClassType entityType = typeSystem.getDataType(ClassType.class, entityTypeName);
-
- //Both assigned id and values are required for full update
- //classtype.convert() will remove values if id is assigned. So, set temp id, convert and
- // then replace with original id
- Id origId = entityInstance.getId();
- entityInstance.replaceWithNewId(new Id(entityInstance.getTypeName()));
- ITypedReferenceableInstance typedInstrance = entityType.convert(entityInstance, Multiplicity.REQUIRED);
- ((ReferenceableInstance)typedInstrance).replaceWithNewId(origId);
- return typedInstrance;
+ return GraphHelper.getTypedReferenceableInstance(typeSystem, entityInstance);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
new file mode 100644
index 0000000..a270b97
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.util;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.audit.HBaseBasedAuditRepository;
+import org.apache.atlas.repository.graph.DeleteHandler;
+import org.apache.atlas.repository.graph.SoftDeleteHandler;
+import org.apache.atlas.repository.graphdb.GraphDatabase;
+import org.apache.atlas.repository.typestore.GraphBackedTypeStore;
+import org.apache.atlas.typesystem.types.cache.DefaultTypeCache;
+import org.apache.atlas.typesystem.types.cache.TypeCache;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Atlas configuration for repository project
+ *
+ */
+public class AtlasRepositoryConfiguration {
+
+ private static Logger LOG = LoggerFactory.getLogger(AtlasRepositoryConfiguration.class);
+
+ public static final String TYPE_CACHE_IMPLEMENTATION_PROPERTY = "atlas.TypeCache.impl";
+
+ @SuppressWarnings("unchecked")
+ public static Class<? extends TypeCache> getTypeCache() {
+
+ // Get the type cache implementation class from Atlas configuration.
+ try {
+ Configuration config = ApplicationProperties.get();
+ return ApplicationProperties.getClass(config, TYPE_CACHE_IMPLEMENTATION_PROPERTY,
+ DefaultTypeCache.class.getName(), TypeCache.class);
+ } catch (AtlasException e) {
+ LOG.error("Error loading typecache ", e);
+ return DefaultTypeCache.class;
+ }
+ }
+ private static final String AUDIT_REPOSITORY_IMPLEMENTATION_PROPERTY = "atlas.EntityAuditRepository.impl";
+
+ @SuppressWarnings("unchecked")
+ public static Class<? extends EntityAuditRepository> getAuditRepositoryImpl() {
+ try {
+ Configuration config = ApplicationProperties.get();
+ return ApplicationProperties.getClass(config,
+ AUDIT_REPOSITORY_IMPLEMENTATION_PROPERTY, HBaseBasedAuditRepository.class.getName(), EntityAuditRepository.class);
+ } catch (AtlasException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static final String DELETE_HANDLER_IMPLEMENTATION_PROPERTY = "atlas.DeleteHandler.impl";
+
+ @SuppressWarnings("unchecked")
+ public static Class<? extends DeleteHandler> getDeleteHandlerImpl() {
+ try {
+ Configuration config = ApplicationProperties.get();
+ return ApplicationProperties.getClass(config,
+ DELETE_HANDLER_IMPLEMENTATION_PROPERTY, SoftDeleteHandler.class.getName(), DeleteHandler.class);
+ } catch (AtlasException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static final String GRAPH_DATABASE_IMPLEMENTATION_PROPERTY = "atlas.graphdb.backend";
+ private static final String DEFAULT_GRAPH_DATABASE_IMPLEMENTATION_CLASS = "org.apache.atlas.repository.graphdb.titan0.Titan0GraphDatabase";
+
+ @SuppressWarnings("unchecked")
+ public static Class<? extends GraphDatabase> getGraphDatabaseImpl() {
+ try {
+ Configuration config = ApplicationProperties.get();
+ return ApplicationProperties.getClass(config,
+ GRAPH_DATABASE_IMPLEMENTATION_PROPERTY, DEFAULT_GRAPH_DATABASE_IMPLEMENTATION_CLASS, GraphDatabase.class);
+ } catch (AtlasException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
----------------------------------------------------------------------
diff --git a/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala b/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
old mode 100755
new mode 100644
index c4621cd..569d3f9
--- a/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
+++ b/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
@@ -20,7 +20,7 @@ package org.apache.atlas.query
import java.util
-import com.thinkaurelius.titan.core.TitanGraph
+import org.apache.atlas.repository.graphdb.AtlasGraph
import org.apache.atlas.query.Expressions._
import org.apache.atlas.typesystem.ITypedStruct
import org.apache.atlas.typesystem.json.{InstanceSerialization, Serialization}
@@ -116,7 +116,7 @@ trait ClosureQuery {
def withPath : Boolean
def persistenceStrategy: GraphPersistenceStrategies
- def g: TitanGraph
+ def g: AtlasGraph[_,_]
def pathExpr : Expressions.Expression = {
closureRelation.tail.foldLeft(closureRelation.head.toExpr)((b,a) => b.field(a.toFieldName))
@@ -184,8 +184,8 @@ trait ClosureQuery {
* foreach resultRow
* for each Path entry
* add an entry in the edges Map
- * add an entry for the Src Vertex to the vertex Map
- * add an entry for the Dest Vertex to the vertex Map
+ * add an entry for the Src AtlasVertex to the vertex Map
+ * add an entry for the Dest AtlasVertex to the vertex Map
*/
res.rows.map(_.asInstanceOf[StructInstance]).foreach { r =>
@@ -207,7 +207,7 @@ trait ClosureQuery {
}
currVertex = nextVertex
}
- val vertex = r.get(TypeUtils.ResultWithPathStruct.resultAttrName)
+ val AtlasVertex = r.get(TypeUtils.ResultWithPathStruct.resultAttrName)
vertices.put(id(srcVertex), vertexStruct(srcVertex,
r.get(TypeUtils.ResultWithPathStruct.resultAttrName).asInstanceOf[ITypedStruct],
s"${SRC_PREFIX}_"))
@@ -242,6 +242,7 @@ trait SingleInstanceClosureQuery[T] extends ClosureQuery {
}
}
+import scala.language.existentials;
/**
* A ClosureQuery to compute '''Lineage''' for Hive tables. Assumes the Lineage relation is captured in a ''CTAS''
* type, and the table relations are captured as attributes from a CTAS instance to Table instances.
@@ -266,7 +267,7 @@ case class InputLineageClosureQuery(tableTypeName : String,
selectAttributes : Option[List[String]],
withPath : Boolean,
persistenceStrategy: GraphPersistenceStrategies,
- g: TitanGraph
+ g: AtlasGraph[_,_]
) extends SingleInstanceClosureQuery[String] {
val closureType : String = tableTypeName
@@ -306,7 +307,7 @@ case class OutputLineageClosureQuery(tableTypeName : String,
selectAttributes : Option[List[String]],
withPath : Boolean,
persistenceStrategy: GraphPersistenceStrategies,
- g: TitanGraph
+ g: AtlasGraph[_,_]
) extends SingleInstanceClosureQuery[String] {
val closureType : String = tableTypeName