You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2016/05/25 18:34:36 UTC
[3/3] incubator-atlas git commit: ATLAS-716 Entity update/delete
notifications (shwethags)
ATLAS-716 Entity update/delete notifications (shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/705014eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/705014eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/705014eb
Branch: refs/heads/master
Commit: 705014eb3352180ff8f2cac05dbbc0809b421d8c
Parents: 153fc36
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Thu May 26 00:02:32 2016 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Thu May 26 00:04:22 2016 +0530
----------------------------------------------------------------------
.../atlas/hive/bridge/HiveMetaStoreBridge.java | 2 +-
.../org/apache/atlas/hive/hook/HiveHookIT.java | 65 +++--
.../main/java/org/apache/atlas/AtlasClient.java | 112 ++++++--
.../java/org/apache/atlas/EntityAuditEvent.java | 40 ++-
.../src/main/java/org/apache/atlas/SerDe.java | 79 ++++++
.../java/org/apache/atlas/AtlasClientTest.java | 26 ++
.../atlas/notification/MessageVersion.java | 5 +
.../notification/NotificationInterface.java | 2 +-
.../VersionedMessageDeserializer.java | 12 +-
.../NotificationEntityChangeListener.java | 106 -------
.../AbstractNotificationConsumerTest.java | 14 +-
release-log.txt | 1 +
.../atlas/repository/MetadataRepository.java | 8 +-
.../repository/audit/EntityAuditListener.java | 5 +-
.../audit/HBaseBasedAuditRepository.java | 3 +
.../atlas/repository/graph/DeleteHandler.java | 136 +++++----
.../graph/GraphBackedMetadataRepository.java | 22 +-
.../atlas/repository/graph/GraphHelper.java | 59 ++--
.../graph/GraphToTypedInstanceMapper.java | 13 +-
.../repository/graph/HardDeleteHandler.java | 8 +-
.../repository/graph/SoftDeleteHandler.java | 32 ++-
.../graph/TypedInstanceToGraphMapper.java | 181 ++++++------
.../typestore/GraphBackedTypeStore.java | 21 +-
.../atlas/services/DefaultMetadataService.java | 72 ++---
.../audit/AuditRepositoryTestBase.java | 10 +-
...hBackedMetadataRepositoryDeleteTestBase.java | 221 +++++++++------
.../GraphBackedRepositoryHardDeleteTest.java | 63 +++-
.../GraphBackedRepositorySoftDeleteTest.java | 99 ++++++-
.../service/DefaultMetadataServiceTest.java | 284 ++++++++++++-------
.../java/org/apache/atlas/RequestContext.java | 19 +-
.../apache/atlas/services/MetadataService.java | 21 +-
.../apache/atlas/typesystem/Referenceable.java | 2 +-
.../java/org/apache/atlas/LocalAtlasClient.java | 22 +-
.../NotificationEntityChangeListener.java | 161 +++++++++++
.../atlas/web/resources/EntityResource.java | 97 +++----
.../apache/atlas/web/service/ServiceModule.java | 2 +-
.../org/apache/atlas/LocalAtlasClientTest.java | 21 +-
.../notification/EntityNotificationIT.java | 2 -
.../NotificationEntityChangeListenerTest.java | 90 ++++++
.../web/resources/EntityJerseyResourceIT.java | 48 ++--
.../atlas/web/service/CuratorFactoryTest.java | 5 -
41 files changed, 1443 insertions(+), 748 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 254e150..fe07d73 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -188,7 +188,7 @@ public class HiveMetaStoreBridge {
List<String> guids = getAtlasClient().createEntity(entityJSON);
LOG.debug("created instance for type " + typeName + ", guid: " + guids);
- return new Referenceable(guids.get(0), referenceable.getTypeName(), null);
+ return new Referenceable(guids.get(guids.size() - 1), referenceable.getTypeName(), null);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index 70100f1..84d9a52 100755
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -29,7 +29,6 @@ import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hive.rewrite.HiveASTRewriter;
-import org.apache.atlas.hive.rewrite.RewriteException;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.persistence.Id;
@@ -66,6 +65,7 @@ import java.util.Map;
import static org.apache.atlas.hive.hook.HiveHook.lower;
import static org.apache.atlas.hive.hook.HiveHook.normalize;
+import static org.apache.atlas.hive.model.HiveDataModelGenerator.NAME;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
@@ -197,8 +197,8 @@ public class HiveHookIT {
Assert.assertEquals(tableRef.get(HiveDataModelGenerator.TABLE_TYPE_ATTR), TableType.MANAGED_TABLE.name());
Assert.assertEquals(tableRef.get(HiveDataModelGenerator.COMMENT), "table comment");
String entityName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
- Assert.assertEquals(tableRef.get(HiveDataModelGenerator.NAME), entityName);
- Assert.assertEquals(tableRef.get(HiveDataModelGenerator.NAME), "default." + tableName.toLowerCase() + "@" + CLUSTER_NAME);
+ Assert.assertEquals(tableRef.get(NAME), entityName);
+ Assert.assertEquals(tableRef.get(NAME), "default." + tableName.toLowerCase() + "@" + CLUSTER_NAME);
Table t = hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, tableName);
long createTime = Long.parseLong(t.getMetadata().getProperty(hive_metastoreConstants.DDL_TIME)) * HiveMetaStoreBridge.MILLIS_CONVERT_FACTOR;
@@ -631,7 +631,7 @@ public class HiveHookIT {
final String newDBName = createDatabase();
assertTableIsRegistered(DEFAULT_DB, tableName);
- String columnGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), HiveDataModelGenerator.NAME));
+ String columnGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), NAME));
String sdGuid = assertSDIsRegistered(HiveMetaStoreBridge.getStorageDescQFName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName)), null);
assertDatabaseIsRegistered(newDBName);
@@ -649,10 +649,10 @@ public class HiveHookIT {
String query = String.format("alter table %s rename to %s", DEFAULT_DB + "." + tableName, newDBName + "." + newTableName);
runCommand(query);
- String newColGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, newTableName), HiveDataModelGenerator.NAME));
+ String newColGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, newTableName), NAME));
Assert.assertEquals(newColGuid, columnGuid);
- assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, tableName), HiveDataModelGenerator.NAME));
+ assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, tableName), NAME));
assertTrait(columnGuid, colTraitDetails);
String newSdGuid = assertSDIsRegistered(HiveMetaStoreBridge.getStorageDescQFName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, newTableName)), null);
@@ -676,7 +676,16 @@ public class HiveHookIT {
private List<Referenceable> getColumns(String dbName, String tableName) throws Exception {
String tableId = assertTableIsRegistered(dbName, tableName);
Referenceable tableRef = atlasClient.getEntity(tableId);
- return ((List<Referenceable>)tableRef.get(HiveDataModelGenerator.COLUMNS));
+
+ //with soft delete, the deleted columns are returned as well. So, filter the deleted ones
+ List<Referenceable> columns = ((List<Referenceable>) tableRef.get(HiveDataModelGenerator.COLUMNS));
+ List<Referenceable> activeColumns = new ArrayList<>();
+ for (Referenceable col : columns) {
+ if (col.getId().getState() == Id.EntityState.ACTIVE) {
+ activeColumns.add(col);
+ }
+ }
+ return activeColumns;
}
@@ -723,21 +732,15 @@ public class HiveHookIT {
colDropped));
//Verify the number of columns present in the table
- assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
- @Override
- public void assertOnEntity(Referenceable tableRef) throws Exception {
- List<Referenceable> columns = (List<Referenceable>) tableRef.get(HiveDataModelGenerator.COLUMNS);
- Assert.assertEquals(columns.size(), 1);
- Assert.assertEquals(columns.get(0).get(HiveDataModelGenerator.NAME), HiveDataModelGenerator.NAME);
-
- }
- });
+ List<Referenceable> columns = getColumns(DEFAULT_DB, tableName);
+ assertEquals(columns.size(), 1);
+ assertEquals(columns.get(0).get(NAME), "name");
}
@Test
public void testAlterTableChangeColumn() throws Exception {
//Change name
- String oldColName = HiveDataModelGenerator.NAME;
+ String oldColName = NAME;
String newColName = "name1";
String tableName = createTable();
String query = String.format("alter table %s change %s %s string", tableName, oldColName, newColName);
@@ -818,8 +821,8 @@ public class HiveHookIT {
@Override
public void assertOnEntity(Referenceable entity) throws Exception {
List<Referenceable> columns = (List<Referenceable>) entity.get(HiveDataModelGenerator.COLUMNS);
- assertEquals(columns.get(0).get(HiveDataModelGenerator.NAME), finalNewColName);
- assertEquals(columns.get(1).get(HiveDataModelGenerator.NAME), "id");
+ assertEquals(columns.get(0).get(NAME), finalNewColName);
+ assertEquals(columns.get(1).get(NAME), "id");
}
}
);
@@ -846,8 +849,8 @@ public class HiveHookIT {
@Override
public void assertOnEntity(Referenceable entity) throws Exception {
List<Referenceable> columns = (List<Referenceable>) entity.get(HiveDataModelGenerator.COLUMNS);
- assertEquals(columns.get(1).get(HiveDataModelGenerator.NAME), finalNewColName2);
- assertEquals(columns.get(0).get(HiveDataModelGenerator.NAME), "id");
+ assertEquals(columns.get(1).get(NAME), finalNewColName2);
+ assertEquals(columns.get(0).get(NAME), "id");
}
}
);
@@ -955,7 +958,7 @@ public class HiveHookIT {
Referenceable hdfsPathRef = atlasClient.getEntity(hdfsPathId);
Assert.assertEquals(hdfsPathRef.get("path"), testPathNormed);
- Assert.assertEquals(hdfsPathRef.get(HiveDataModelGenerator.NAME), testPathNormed);
+ Assert.assertEquals(hdfsPathRef.get(NAME), testPathNormed);
// Assert.assertEquals(hdfsPathRef.get("name"), new Path(testPath).getName());
Assert.assertEquals(hdfsPathRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), testPathNormed);
@@ -964,7 +967,7 @@ public class HiveHookIT {
private String assertHDFSPathIsRegistered(String path) throws Exception {
LOG.debug("Searching for hdfs path {}", path);
- return assertEntityIsRegistered(FSDataTypes.HDFS_PATH().toString(), HiveDataModelGenerator.NAME, path, null);
+ return assertEntityIsRegistered(FSDataTypes.HDFS_PATH().toString(), NAME, path, null);
}
@Test
@@ -1014,7 +1017,7 @@ public class HiveHookIT {
ImmutableList<String> cols = ImmutableList.of("id");
runBucketSortQuery(tableName, 5, cols, cols);
- cols = ImmutableList.of("id", HiveDataModelGenerator.NAME);
+ cols = ImmutableList.of("id", NAME);
runBucketSortQuery(tableName, 2, cols, cols);
}
@@ -1077,7 +1080,7 @@ public class HiveHookIT {
assertTableIsRegistered(DEFAULT_DB, tableName);
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "id"));
- assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), HiveDataModelGenerator.NAME));
+ assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), NAME));
final String query = String.format("drop table %s ", tableName);
runCommand(query);
@@ -1086,7 +1089,7 @@ public class HiveHookIT {
"id"));
assertColumnIsNotRegistered(HiveMetaStoreBridge
.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName),
- HiveDataModelGenerator.NAME));
+ NAME));
assertTableIsNotRegistered(DEFAULT_DB, tableName);
}
@@ -1110,7 +1113,7 @@ public class HiveHookIT {
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), "id"));
assertColumnIsNotRegistered(HiveMetaStoreBridge
.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]),
- HiveDataModelGenerator.NAME));
+ NAME));
for(int i = 0; i < numTables; i++) {
assertTableIsNotRegistered(dbName, tableNames[i]);
@@ -1175,7 +1178,7 @@ public class HiveHookIT {
assertTableIsRegistered(DEFAULT_DB, viewName);
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), "id"));
- assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), HiveDataModelGenerator.NAME));
+ assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), NAME));
query = String.format("drop view %s ", viewName);
@@ -1185,7 +1188,7 @@ public class HiveHookIT {
"id"));
assertColumnIsNotRegistered(HiveMetaStoreBridge
.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName),
- HiveDataModelGenerator.NAME));
+ NAME));
assertTableIsNotRegistered(DEFAULT_DB, viewName);
}
@@ -1349,7 +1352,7 @@ public class HiveHookIT {
if (inputTblName != null) {
Referenceable inputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{
- put(HiveDataModelGenerator.NAME, inputTblName);
+ put(NAME, inputTblName);
}});
inputs = new ArrayList<Referenceable>();
inputs.add(inputTableRef);
@@ -1357,7 +1360,7 @@ public class HiveHookIT {
List<Referenceable> outputs = null;
if (outputTblName != null) {
Referenceable outputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{
- put(HiveDataModelGenerator.NAME, outputTblName);
+ put(NAME, outputTblName);
}});
outputs = new ArrayList<Referenceable>();
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/client/src/main/java/org/apache/atlas/AtlasClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java
index 7e32cc2..be178dc 100755
--- a/client/src/main/java/org/apache/atlas/AtlasClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClient.java
@@ -20,11 +20,14 @@ package org.apache.atlas;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
import org.apache.atlas.security.SecureClientUtils;
import org.apache.atlas.typesystem.Referenceable;
@@ -45,6 +48,7 @@ import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@@ -54,8 +58,10 @@ import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
-import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter;
+import java.util.Map;
+
import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED;
/**
@@ -65,9 +71,10 @@ public class AtlasClient {
private static final Logger LOG = LoggerFactory.getLogger(AtlasClient.class);
public static final String NAME = "name";
- public static final String GUID = "GUID";
public static final String TYPE = "type";
public static final String TYPENAME = "typeName";
+ public static final String GUID = "GUID";
+ public static final String ENTITIES = "entities";
public static final String DEFINITION = "definition";
public static final String ERROR = "error";
@@ -340,6 +347,61 @@ public class AtlasClient {
return service;
}
+ public static class EntityResult {
+ private static final Gson gson = new GsonBuilder().setPrettyPrinting().create();
+
+ public static final String OP_CREATED = "created";
+ public static final String OP_UPDATED = "updated";
+ public static final String OP_DELETED = "deleted";
+
+ Map<String, List<String>> entities = new HashMap<>();
+
+ public EntityResult() {
+ //For gson
+ }
+
+ public EntityResult(List<String> created, List<String> updated, List<String> deleted) {
+ add(OP_CREATED, created);
+ add(OP_UPDATED, updated);
+ add(OP_DELETED, deleted);
+ }
+
+ private void add(String type, List<String> list) {
+ if (list != null && list.size() > 0) {
+ entities.put(type, list);
+ }
+ }
+
+ private List<String> get(String type) {
+ List<String> list = entities.get(type);
+ if (list == null) {
+ list = new ArrayList<>();
+ }
+ return list;
+ }
+
+ public List<String> getCreatedEntities() {
+ return get(OP_CREATED);
+ }
+
+ public List<String> getUpdateEntities() {
+ return get(OP_UPDATED);
+ }
+
+ public List<String> getDeletedEntities() {
+ return get(OP_DELETED);
+ }
+
+ @Override
+ public String toString() {
+ return gson.toJson(this);
+ }
+
+ public static EntityResult fromString(String json) throws AtlasServiceException {
+ return gson.fromJson(json, EntityResult.class);
+ }
+ }
+
/**
* Return status of the service instance the client is pointing to.
*
@@ -562,11 +624,15 @@ public class AtlasClient {
protected List<String> createEntity(JSONArray entities) throws AtlasServiceException {
LOG.debug("Creating entities: {}", entities);
JSONObject response = callAPI(API.CREATE_ENTITY, entities.toString());
- List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
+ List<String> results = extractEntityResult(response).getCreatedEntities();
LOG.debug("Create entities returned results: {}", results);
return results;
}
+ protected EntityResult extractEntityResult(JSONObject response) throws AtlasServiceException {
+ return EntityResult.fromString(response.toString());
+ }
+
/**
* Create the given entity
* @param entitiesAsJson entity(type instance) as json
@@ -601,19 +667,19 @@ public class AtlasClient {
* @return json array of guids which were updated/created
* @throws AtlasServiceException
*/
- public List<String> updateEntities(Referenceable... entities) throws AtlasServiceException {
+ public EntityResult updateEntities(Referenceable... entities) throws AtlasServiceException {
return updateEntities(Arrays.asList(entities));
}
- protected List<String> updateEntities(JSONArray entities) throws AtlasServiceException {
+ protected EntityResult updateEntities(JSONArray entities) throws AtlasServiceException {
LOG.debug("Updating entities: {}", entities);
JSONObject response = callAPI(API.UPDATE_ENTITY, entities.toString());
- List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
+ EntityResult results = extractEntityResult(response);
LOG.debug("Update entities returned results: {}", results);
return results;
}
- public List<String> updateEntities(Collection<Referenceable> entities) throws AtlasServiceException {
+ public EntityResult updateEntities(Collection<Referenceable> entities) throws AtlasServiceException {
JSONArray entitiesArray = getEntitiesArray(entities);
return updateEntities(entitiesArray);
}
@@ -625,9 +691,10 @@ public class AtlasClient {
* @param attribute property key
* @param value property value
*/
- public void updateEntityAttribute(final String guid, final String attribute, String value) throws AtlasServiceException {
+ public EntityResult updateEntityAttribute(final String guid, final String attribute, String value)
+ throws AtlasServiceException {
LOG.debug("Updating entity id: {}, attribute name: {}, attribute value: {}", guid, attribute, value);
- callAPIWithRetries(API.UPDATE_ENTITY_PARTIAL, value, new ResourceCreator() {
+ JSONObject response = callAPIWithRetries(API.UPDATE_ENTITY_PARTIAL, value, new ResourceCreator() {
@Override
public WebResource createResource() {
API api = API.UPDATE_ENTITY_PARTIAL;
@@ -636,6 +703,7 @@ public class AtlasClient {
return resource;
}
});
+ return extractEntityResult(response);
}
@VisibleForTesting
@@ -665,10 +733,11 @@ public class AtlasClient {
* @param guid guid
* @param entity entity definition
*/
- public void updateEntity(String guid, Referenceable entity) throws AtlasServiceException {
+ public EntityResult updateEntity(String guid, Referenceable entity) throws AtlasServiceException {
String entityJson = InstanceSerialization.toJson(entity, true);
LOG.debug("Updating entity id {} with {}", guid, entityJson);
- callAPI(API.UPDATE_ENTITY_PARTIAL, entityJson, guid);
+ JSONObject response = callAPI(API.UPDATE_ENTITY_PARTIAL, entityJson, guid);
+ return extractEntityResult(response);
}
/**
@@ -691,8 +760,9 @@ public class AtlasClient {
* @param uniqueAttributeValue Attribute Value that uniquely identifies the entity
* @param entity entity definition
*/
- public String updateEntity(final String entityType, final String uniqueAttributeName, final String uniqueAttributeValue,
- Referenceable entity) throws AtlasServiceException {
+ public EntityResult updateEntity(final String entityType, final String uniqueAttributeName,
+ final String uniqueAttributeValue,
+ Referenceable entity) throws AtlasServiceException {
final API api = API.UPDATE_ENTITY_PARTIAL;
String entityJson = InstanceSerialization.toJson(entity, true);
LOG.debug("Updating entity type: {}, attributeName: {}, attributeValue: {}, entity: {}", entityType,
@@ -707,7 +777,7 @@ public class AtlasClient {
return resource;
}
});
- String result = getString(response, GUID);
+ EntityResult result = extractEntityResult(response);
LOG.debug("Update entity returned result: {}", result);
return result;
}
@@ -724,10 +794,10 @@ public class AtlasClient {
* Delete the specified entities from the repository
*
* @param guids guids of entities to delete
- * @return List of deleted entity guids
+ * @return List of entity ids updated/deleted
* @throws AtlasServiceException
*/
- public List<String> deleteEntities(final String ... guids) throws AtlasServiceException {
+ public EntityResult deleteEntities(final String ... guids) throws AtlasServiceException {
LOG.debug("Deleting entities: {}", guids);
JSONObject jsonResponse = callAPIWithRetries(API.DELETE_ENTITIES, null, new ResourceCreator() {
@Override
@@ -740,7 +810,7 @@ public class AtlasClient {
return resource;
}
});
- List<String> results = extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
+ EntityResult results = extractEntityResult(jsonResponse);
LOG.debug("Delete entities returned results: {}", results);
return results;
}
@@ -750,9 +820,9 @@ public class AtlasClient {
* @param entityType Type of the entity being deleted
* @param uniqueAttributeName Attribute Name that uniquely identifies the entity
* @param uniqueAttributeValue Attribute Value that uniquely identifies the entity
- * @return List of deleted entity guids(including composite references from that entity)
+ * @return List of entity ids updated/deleted(including composite references from that entity)
*/
- public List<String> deleteEntity(String entityType, String uniqueAttributeName, String uniqueAttributeValue)
+ public EntityResult deleteEntity(String entityType, String uniqueAttributeName, String uniqueAttributeValue)
throws AtlasServiceException {
LOG.debug("Deleting entity type: {}, attributeName: {}, attributeValue: {}", entityType, uniqueAttributeName,
uniqueAttributeValue);
@@ -762,7 +832,7 @@ public class AtlasClient {
resource = resource.queryParam(ATTRIBUTE_NAME, uniqueAttributeName);
resource = resource.queryParam(ATTRIBUTE_VALUE, uniqueAttributeValue);
JSONObject jsonResponse = callAPIWithResource(API.DELETE_ENTITIES, resource, null);
- List<String> results = extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
+ EntityResult results = extractEntityResult(jsonResponse);
LOG.debug("Delete entities returned results: {}", results);
return results;
}
@@ -901,7 +971,7 @@ public class AtlasClient {
return extractResults(jsonResponse, AtlasClient.EVENTS, new ExtractOperation<EntityAuditEvent, JSONObject>() {
@Override
EntityAuditEvent extractElement(JSONObject element) throws JSONException {
- return EntityAuditEvent.GSON.fromJson(element.toString(), EntityAuditEvent.class);
+ return SerDe.GSON.fromJson(element.toString(), EntityAuditEvent.class);
}
});
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/client/src/main/java/org/apache/atlas/EntityAuditEvent.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/EntityAuditEvent.java b/client/src/main/java/org/apache/atlas/EntityAuditEvent.java
index 460f708..29a04ab 100644
--- a/client/src/main/java/org/apache/atlas/EntityAuditEvent.java
+++ b/client/src/main/java/org/apache/atlas/EntityAuditEvent.java
@@ -18,16 +18,14 @@
package org.apache.atlas;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.lang.StringUtils;
/**
* Structure of entity audit event
*/
public class EntityAuditEvent {
- public static final Gson GSON = new GsonBuilder().create();
-
public enum EntityAuditAction {
ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE
}
@@ -38,16 +36,19 @@ public class EntityAuditEvent {
private EntityAuditAction action;
private String details;
private String eventKey;
+ private IReferenceableInstance entityDefinition;
public EntityAuditEvent() {
}
- public EntityAuditEvent(String entityId, Long ts, String user, EntityAuditAction action, String details) {
+ public EntityAuditEvent(String entityId, Long ts, String user, EntityAuditAction action, String details,
+ IReferenceableInstance entityDefinition) throws AtlasException {
this.entityId = entityId;
this.timestamp = ts;
this.user = user;
this.action = action;
this.details = details;
+ this.entityDefinition = entityDefinition;
}
@Override
@@ -62,10 +63,12 @@ public class EntityAuditEvent {
EntityAuditEvent otherEvent = (EntityAuditEvent) other;
return StringUtils.equals(entityId, otherEvent.entityId) &&
- (timestamp == otherEvent.timestamp) &&
- StringUtils.equals(user, otherEvent.user) && (action == otherEvent.action) &&
- StringUtils.equals(details, otherEvent.details) &&
- StringUtils.equals(eventKey, otherEvent.eventKey);
+ (timestamp == otherEvent.timestamp) &&
+ StringUtils.equals(user, otherEvent.user) &&
+ (action == otherEvent.action) &&
+ StringUtils.equals(details, otherEvent.details) &&
+ StringUtils.equals(eventKey, otherEvent.eventKey) &&
+ StringUtils.equals(getEntityDefinitionString(), otherEvent.getEntityDefinitionString());
}
@Override
@@ -75,11 +78,11 @@ public class EntityAuditEvent {
@Override
public String toString() {
- return GSON.toJson(this);
+ return SerDe.GSON.toJson(this);
}
public static EntityAuditEvent fromString(String eventString) {
- return GSON.fromJson(eventString, EntityAuditEvent.class);
+ return SerDe.GSON.fromJson(eventString, EntityAuditEvent.class);
}
public String getEntityId() {
@@ -129,4 +132,19 @@ public class EntityAuditEvent {
public void setEventKey(String eventKey) {
this.eventKey = eventKey;
}
+
+ public IReferenceableInstance getEntityDefinition() {
+ return entityDefinition;
+ }
+
+ public String getEntityDefinitionString() {
+ if (entityDefinition != null) {
+ return InstanceSerialization.toJson(entityDefinition, true);
+ }
+ return null;
+ }
+
+ public void setEntityDefinition(String entityDefinition) {
+ this.entityDefinition = InstanceSerialization.fromJsonReferenceable(entityDefinition, true);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/client/src/main/java/org/apache/atlas/SerDe.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/SerDe.java b/client/src/main/java/org/apache/atlas/SerDe.java
new file mode 100644
index 0000000..6b7478a
--- /dev/null
+++ b/client/src/main/java/org/apache/atlas/SerDe.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.atlas.typesystem.IStruct;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.Struct;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
+
+import java.lang.reflect.Type;
+
+public class SerDe {
+ public static final Gson GSON = new GsonBuilder().
+ registerTypeAdapter(IStruct.class, new StructDeserializer()).
+ registerTypeAdapter(IReferenceableInstance.class, new ReferenceableSerializerDeserializer()).
+ registerTypeAdapter(Referenceable.class, new ReferenceableSerializerDeserializer()).
+ create();
+
+ /**
+ * Serde for Struct used by AbstractNotificationConsumer.GSON.
+ */
+ public static final class StructDeserializer implements JsonDeserializer<IStruct>, JsonSerializer<IStruct> {
+ @Override
+ public IStruct deserialize(final JsonElement json, final Type type,
+ final JsonDeserializationContext context) {
+ return context.deserialize(json, Struct.class);
+ }
+
+ @Override
+ public JsonElement serialize(IStruct src, Type typeOfSrc, JsonSerializationContext context) {
+ String instanceJson = InstanceSerialization.toJson(src, true);
+ return new JsonParser().parse(instanceJson).getAsJsonObject();
+ }
+ }
+
+ /**
+ * Serde for Referenceable used by AbstractNotificationConsumer.GSON.
+ */
+ public static final class ReferenceableSerializerDeserializer implements JsonDeserializer<IStruct>,
+ JsonSerializer<IReferenceableInstance> {
+ @Override
+ public IReferenceableInstance deserialize(final JsonElement json, final Type type,
+ final JsonDeserializationContext context) {
+
+ return InstanceSerialization.fromJsonReferenceable(json.toString(), true);
+ }
+
+ @Override
+ public JsonElement serialize(IReferenceableInstance src, Type typeOfSrc, JsonSerializationContext context) {
+ String instanceJson = InstanceSerialization.toJson(src, true);
+ return new JsonParser().parse(instanceJson).getAsJsonObject();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/client/src/test/java/org/apache/atlas/AtlasClientTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/atlas/AtlasClientTest.java b/client/src/test/java/org/apache/atlas/AtlasClientTest.java
index 0e80573..77a387f 100644
--- a/client/src/test/java/org/apache/atlas/AtlasClientTest.java
+++ b/client/src/test/java/org/apache/atlas/AtlasClientTest.java
@@ -21,8 +21,12 @@ import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
+import org.codehaus.jettison.json.JSONObject;
+import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
@@ -33,9 +37,12 @@ import javax.ws.rs.core.UriBuilder;
import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -76,6 +83,25 @@ public class AtlasClientTest {
assertTrue(atlasClient.isServerReady());
}
+ @Test
+ public void testCreateEntity() throws Exception {
+ setupRetryParams();
+ AtlasClient atlasClient = new AtlasClient(service, configuration);
+
+ WebResource.Builder builder = setupBuilder(AtlasClient.API.CREATE_ENTITY, service);
+ ClientResponse response = mock(ClientResponse.class);
+ when(response.getStatus()).thenReturn(Response.Status.CREATED.getStatusCode());
+
+ JSONObject jsonResponse = new JSONObject(new AtlasClient.EntityResult(Arrays.asList("id"), null, null).toString());
+ when(response.getEntity(String.class)).thenReturn(jsonResponse.toString());
+ String entityJson = InstanceSerialization.toJson(new Referenceable("type"), true);
+ when(builder.method(anyString(), Matchers.<Class>any(), anyString())).thenReturn(response);
+
+ List<String> ids = atlasClient.createEntity(entityJson);
+ assertEquals(ids.size(), 1);
+ assertEquals(ids.get(0), "id");
+ }
+
private WebResource.Builder setupBuilder(AtlasClient.API api, WebResource webResource) {
when(webResource.path(api.getPath())).thenReturn(service);
WebResource.Builder builder = getBuilder(service);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
index 3f16a9a..6ef407a 100644
--- a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
+++ b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
@@ -97,6 +97,11 @@ public class MessageVersion implements Comparable<MessageVersion> {
}
+ @Override
+ public String toString() {
+ return "MessageVersion[version=" + version + "]";
+ }
+
// ----- helper methods --------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index 384f383..ef8ee27 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -17,11 +17,11 @@
*/
package org.apache.atlas.notification;
+import com.google.gson.reflect.TypeToken;
import org.apache.atlas.notification.entity.EntityMessageDeserializer;
import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.notification.hook.HookMessageDeserializer;
import org.apache.atlas.notification.hook.HookNotification;
-import com.google.gson.reflect.TypeToken;
import java.lang.reflect.Type;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java
index 290be59..cc2099e 100644
--- a/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java
@@ -31,7 +31,7 @@ import java.lang.reflect.Type;
public abstract class VersionedMessageDeserializer<T> implements MessageDeserializer<T> {
public static final String VERSION_MISMATCH_MSG =
- "Notification message version mismatch. Expected %s but recieved %s";
+ "Notification message version mismatch. Expected %s but recieved %s. Message %s";
private final Type versionedMessageType;
private final MessageVersion expectedVersion;
@@ -90,18 +90,16 @@ public abstract class VersionedMessageDeserializer<T> implements MessageDeserial
// message has newer version
if (comp > 0) {
- String msg = String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion());
+ String msg =
+ String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion(), messageJson);
notificationLogger.error(msg);
- notificationLogger.info(messageJson);
throw new IncompatibleVersionException(msg);
}
// message has older version
if (comp < 0) {
- notificationLogger.info(
- String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion()));
-
- notificationLogger.info(messageJson);
+ notificationLogger.info(String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion(),
+ messageJson));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java b/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
deleted file mode 100644
index 300cbb5..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.notification.entity;
-
-import com.google.inject.Inject;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.listener.EntityChangeListener;
-import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.typesystem.IReferenceableInstance;
-import org.apache.atlas.typesystem.IStruct;
-import org.apache.atlas.typesystem.ITypedReferenceableInstance;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.types.TypeSystem;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Listen to the repository for entity changes and produce entity change notifications.
- */
-public class NotificationEntityChangeListener implements EntityChangeListener {
-
- private final NotificationInterface notificationInterface;
- private final TypeSystem typeSystem;
-
-
- // ----- Constructors ------------------------------------------------------
-
- /**
- * Construct a NotificationEntityChangeListener.
- *
- * @param notificationInterface the notification framework interface
- * @param typeSystem the Atlas type system
- */
- @Inject
- public NotificationEntityChangeListener(NotificationInterface notificationInterface, TypeSystem typeSystem) {
- this.notificationInterface = notificationInterface;
- this.typeSystem = typeSystem;
- }
-
-
- // ----- EntityChangeListener ----------------------------------------------
-
- @Override
- public void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
- notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_CREATE);
- }
-
- @Override
- public void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
- notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_UPDATE);
- }
-
- @Override
- public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException {
- notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_ADD);
- }
-
- @Override
- public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException {
- notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_DELETE);
- }
-
- @Override
- public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
- notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_DELETE);
- }
-
-
- // ----- helper methods -------------------------------------------------
-
-
- // send notification of entity change
- private void notifyOfEntityEvent(Collection<ITypedReferenceableInstance> entityDefinitions,
- EntityNotification.OperationType operationType) throws AtlasException {
- List<EntityNotification> messages = new LinkedList<>();
-
- for (IReferenceableInstance entityDefinition : entityDefinitions) {
- Referenceable entity = new Referenceable(entityDefinition);
-
- EntityNotificationImpl notification =
- new EntityNotificationImpl(entity, operationType, typeSystem);
-
- messages.add(notification);
- }
-
- notificationInterface.send(NotificationInterface.NotificationType.ENTITIES, messages);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index e8b55ef..0c8990f 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -27,9 +27,13 @@ import java.lang.reflect.Type;
import java.util.LinkedList;
import java.util.List;
+import static org.mockito.Matchers.endsWith;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
/**
* AbstractNotificationConsumer tests.
@@ -110,17 +114,17 @@ public class AbstractNotificationConsumerTest {
assertTrue(consumer.hasNext());
assertEquals(new TestMessage("sValue2", 98), consumer.next());
- verify(logger).info(json2);
+ verify(logger).info(endsWith(json2));
assertTrue(consumer.hasNext());
assertEquals(new TestMessage("sValue3", 97), consumer.next());
- verify(logger).info(json3);
+ verify(logger).info(endsWith(json3));
assertTrue(consumer.hasNext());
assertEquals(new TestMessage("sValue4", 96), consumer.next());
- verify(logger).info(json4);
+ verify(logger).info(endsWith(json4));
assertFalse(consumer.hasNext());
}
@@ -154,7 +158,7 @@ public class AbstractNotificationConsumerTest {
consumer.next();
fail("Expected VersionMismatchException!");
} catch (IncompatibleVersionException e) {
- verify(logger).info(json2);
+ verify(logger).error(endsWith(json2));
}
assertFalse(consumer.hasNext());
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index fd17292..0402b49 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -3,6 +3,7 @@ Apache Atlas Release Notes
--trunk - unreleased
INCOMPATIBLE CHANGES:
+ATLAS-716 Entity update/delete notifications (shwethags)
ATLAS-619 Canonicalize hive queries (sumasai)
ATLAS-497 Simple Authorization (saqeeb.s via yhemanth)
ATLAS-661 REST API Authentication (nixonrodrigues via yhemanth)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java
index 540c308..43e9f85 100755
--- a/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java
@@ -18,6 +18,7 @@
package org.apache.atlas.repository;
+import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.ITypedStruct;
@@ -26,7 +27,6 @@ import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.exception.TraitNotFoundException;
import org.apache.atlas.typesystem.types.AttributeInfo;
import org.apache.atlas.typesystem.types.IDataType;
-import org.apache.atlas.typesystem.types.TypeUtils;
import java.util.List;
@@ -111,7 +111,7 @@ public interface MetadataRepository {
* @return guids of deleted entities
* @throws RepositoryException
*/
- TypeUtils.Pair<List<String>, List<ITypedReferenceableInstance>> deleteEntities(List<String> guids) throws RepositoryException;
+ AtlasClient.EntityResult deleteEntities(List<String> guids) throws RepositoryException;
// Trait management functions
@@ -147,13 +147,13 @@ public interface MetadataRepository {
* Adds/Updates the property to the entity that corresponds to the GUID
* Supports only primitive attribute/Class Id updations.
*/
- TypeUtils.Pair<List<String>, List<String>> updatePartial(ITypedReferenceableInstance entity) throws RepositoryException;
+ AtlasClient.EntityResult updatePartial(ITypedReferenceableInstance entity) throws RepositoryException;
/**
* Adds the property to the entity that corresponds to the GUID
* @param entitiesToBeUpdated The entities to be updated
*/
- TypeUtils.Pair<List<String>, List<String>> updateEntities(ITypedReferenceableInstance... entitiesToBeUpdated) throws RepositoryException;
+ AtlasClient.EntityResult updateEntities(ITypedReferenceableInstance... entitiesToBeUpdated) throws RepositoryException;
/**
* Returns the entity for the given type and qualified name
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
index 5b4bdbf..958ecaf 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
@@ -55,8 +55,9 @@ public class EntityAuditListener implements EntityChangeListener {
}
private EntityAuditEvent createEvent(ITypedReferenceableInstance entity, long ts,
- EntityAuditEvent.EntityAuditAction action, String details) {
- return new EntityAuditEvent(entity.getId()._getId(), ts, RequestContext.get().getUser(), action, details);
+ EntityAuditEvent.EntityAuditAction action, String details)
+ throws AtlasException {
+ return new EntityAuditEvent(entity.getId()._getId(), ts, RequestContext.get().getUser(), action, details, entity);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
index 8f11322..22d71df 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
@@ -78,6 +78,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
public static final byte[] COLUMN_ACTION = Bytes.toBytes("action");
public static final byte[] COLUMN_DETAIL = Bytes.toBytes("detail");
public static final byte[] COLUMN_USER = Bytes.toBytes("user");
+ public static final byte[] COLUMN_DEFINITION = Bytes.toBytes("def");
private TableName tableName;
private Connection connection;
@@ -110,6 +111,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
addColumn(put, COLUMN_ACTION, event.getAction());
addColumn(put, COLUMN_USER, event.getUser());
addColumn(put, COLUMN_DETAIL, event.getDetails());
+ addColumn(put, COLUMN_DEFINITION, event.getEntityDefinitionString());
puts.add(put);
}
table.put(puts);
@@ -183,6 +185,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
event.setUser(getResultString(result, COLUMN_USER));
event.setAction(EntityAuditEvent.EntityAuditAction.valueOf(getResultString(result, COLUMN_ACTION)));
event.setDetails(getResultString(result, COLUMN_DETAIL));
+ event.setEntityDefinition(getResultString(result, COLUMN_DEFINITION));
events.add(event);
}
LOG.info("Got events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, events.size());
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java b/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java
index a9e4f39..91f9bd0 100644
--- a/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java
@@ -47,15 +47,16 @@ import static org.apache.atlas.repository.graph.GraphHelper.string;
public abstract class DeleteHandler {
public static final Logger LOG = LoggerFactory.getLogger(DeleteHandler.class);
- private static final GraphHelper graphHelper = GraphHelper.getInstance();
+ protected static final GraphHelper graphHelper = GraphHelper.getInstance();
protected TypeSystem typeSystem;
private boolean shouldUpdateReverseAttribute;
+ private boolean softDelete;
- public DeleteHandler(TypeSystem typeSystem, boolean shouldUpdateReverseAttribute) {
+ public DeleteHandler(TypeSystem typeSystem, boolean shouldUpdateReverseAttribute, boolean softDelete) {
this.typeSystem = typeSystem;
this.shouldUpdateReverseAttribute = shouldUpdateReverseAttribute;
-
+ this.softDelete = softDelete;
}
/**
@@ -64,16 +65,22 @@ public abstract class DeleteHandler {
* @throws AtlasException
*/
public void deleteEntity(Vertex instanceVertex) throws AtlasException {
+ RequestContext requestContext = RequestContext.get();
String guid = GraphHelper.getIdFromVertex(instanceVertex);
+ Id.EntityState state = GraphHelper.getState(instanceVertex);
+ if (requestContext.getDeletedEntityIds().contains(guid) || state == Id.EntityState.DELETED) {
+ LOG.debug("Skipping deleting {} as its already deleted", guid);
+ return;
+ }
String typeName = GraphHelper.getTypeName(instanceVertex);
- RequestContext.get().recordDeletedEntity(guid, typeName);
+ requestContext.recordEntityDelete(guid, typeName);
deleteAllTraits(instanceVertex);
- deleteTypeVertex(instanceVertex);
+ deleteTypeVertex(instanceVertex, false);
}
- protected abstract void deleteEdge(Edge edge) throws AtlasException;
+ protected abstract void deleteEdge(Edge edge, boolean force) throws AtlasException;
/**
* Deletes a type vertex - can be entity(class type) or just vertex(struct/trait type)
@@ -81,11 +88,11 @@ public abstract class DeleteHandler {
* @param typeCategory
* @throws AtlasException
*/
- protected void deleteTypeVertex(Vertex instanceVertex, DataTypes.TypeCategory typeCategory) throws AtlasException {
+ protected void deleteTypeVertex(Vertex instanceVertex, DataTypes.TypeCategory typeCategory, boolean force) throws AtlasException {
switch (typeCategory) {
case STRUCT:
case TRAIT:
- deleteTypeVertex(instanceVertex);
+ deleteTypeVertex(instanceVertex, force);
break;
case CLASS:
@@ -102,7 +109,7 @@ public abstract class DeleteHandler {
* @param instanceVertex
* @throws AtlasException
*/
- protected void deleteTypeVertex(Vertex instanceVertex) throws AtlasException {
+ protected void deleteTypeVertex(Vertex instanceVertex, boolean force) throws AtlasException {
LOG.debug("Deleting {}", string(instanceVertex));
String typeName = GraphHelper.getTypeName(instanceVertex);
IDataType type = typeSystem.getDataType(IDataType.class, typeName);
@@ -115,12 +122,12 @@ public abstract class DeleteHandler {
switch (attributeInfo.dataType().getTypeCategory()) {
case CLASS:
//If its class attribute, delete the reference
- deleteReference(instanceVertex, edgeLabel, DataTypes.TypeCategory.CLASS, attributeInfo.isComposite);
+ deleteEdgeReference(instanceVertex, edgeLabel, DataTypes.TypeCategory.CLASS, attributeInfo.isComposite);
break;
case STRUCT:
//If its struct attribute, delete the reference
- deleteReference(instanceVertex, edgeLabel, DataTypes.TypeCategory.STRUCT);
+ deleteEdgeReference(instanceVertex, edgeLabel, DataTypes.TypeCategory.STRUCT, false);
break;
case ARRAY:
@@ -133,7 +140,7 @@ public abstract class DeleteHandler {
if (edges != null) {
while (edges.hasNext()) {
Edge edge = edges.next();
- deleteReference(edge, elementType, attributeInfo);
+ deleteEdgeReference(edge, elementType.getTypeCategory(), attributeInfo.isComposite, false);
}
}
}
@@ -151,22 +158,31 @@ public abstract class DeleteHandler {
if (keys != null) {
for (String key : keys) {
String mapEdgeLabel = GraphHelper.getQualifiedNameForMapKey(edgeLabel, key);
- deleteReference(instanceVertex, mapEdgeLabel, valueTypeCategory, attributeInfo.isComposite);
+ deleteEdgeReference(instanceVertex, mapEdgeLabel, valueTypeCategory, attributeInfo.isComposite);
}
}
}
}
}
- deleteVertex(instanceVertex, type.getTypeCategory());
+ deleteVertex(instanceVertex, force);
}
- public void deleteReference(Edge edge, IDataType dataType, AttributeInfo attributeInfo) throws AtlasException {
- deleteReference(edge, dataType.getTypeCategory(), attributeInfo.isComposite);
- }
-
- public void deleteReference(Edge edge, DataTypes.TypeCategory typeCategory, boolean isComposite) throws AtlasException {
+ /**
+ * Force delete is used to remove struct/trait in case of entity updates
+ * @param edge
+ * @param typeCategory
+ * @param isComposite
+ * @param forceDeleteStructTrait
+ * @return returns true if the edge reference is hard deleted
+ * @throws AtlasException
+ */
+ public boolean deleteEdgeReference(Edge edge, DataTypes.TypeCategory typeCategory, boolean isComposite,
+ boolean forceDeleteStructTrait) throws AtlasException {
LOG.debug("Deleting {}", string(edge));
+ boolean forceDelete =
+ (typeCategory == DataTypes.TypeCategory.STRUCT || typeCategory == DataTypes.TypeCategory.TRAIT)
+ ? forceDeleteStructTrait : false;
if (typeCategory == DataTypes.TypeCategory.STRUCT || typeCategory == DataTypes.TypeCategory.TRAIT
|| (typeCategory == DataTypes.TypeCategory.CLASS && isComposite)) {
//If the vertex is of type struct/trait, delete the edge and then the reference vertex as the vertex is not shared by any other entities.
@@ -175,32 +191,28 @@ public abstract class DeleteHandler {
Vertex vertexForDelete = edge.getVertex(Direction.IN);
//If deleting the edge and then the in vertex, reverse attribute shouldn't be updated
- deleteEdge(edge, false);
- deleteTypeVertex(vertexForDelete, typeCategory);
+ deleteEdge(edge, false, forceDelete);
+ deleteTypeVertex(vertexForDelete, typeCategory, forceDelete);
} else {
//If the vertex is of type class, and its not a composite attributes, the reference vertex' lifecycle is not controlled
//through this delete. Hence just remove the reference edge. Leave the reference vertex as is
//If deleting just the edge, reverse attribute should be updated for any references
//For example, for the department type system, if the person's manager edge is deleted, subordinates of manager should be updated
- deleteEdge(edge, true);
+ deleteEdge(edge, true, false);
}
+ return !softDelete || forceDelete;
}
- public void deleteReference(Vertex instanceVertex, String edgeLabel, DataTypes.TypeCategory typeCategory)
- throws AtlasException {
- deleteReference(instanceVertex, edgeLabel, typeCategory, false);
- }
-
- public void deleteReference(Vertex instanceVertex, String edgeLabel, DataTypes.TypeCategory typeCategory,
- boolean isComposite) throws AtlasException {
- Edge edge = GraphHelper.getEdgeForLabel(instanceVertex, edgeLabel);
+ public void deleteEdgeReference(Vertex outVertex, String edgeLabel, DataTypes.TypeCategory typeCategory,
+ boolean isComposite) throws AtlasException {
+ Edge edge = GraphHelper.getEdgeForLabel(outVertex, edgeLabel);
if (edge != null) {
- deleteReference(edge, typeCategory, isComposite);
+ deleteEdgeReference(edge, typeCategory, isComposite, false);
}
}
- protected void deleteEdge(Edge edge, boolean updateReverseAttribute) throws AtlasException {
+ protected void deleteEdge(Edge edge, boolean updateReverseAttribute, boolean force) throws AtlasException {
//update reverse attribute
if (updateReverseAttribute) {
AttributeInfo attributeInfo = getAttributeForEdge(edge.getLabel());
@@ -210,28 +222,28 @@ public abstract class DeleteHandler {
}
}
- deleteEdge(edge);
+ deleteEdge(edge, force);
}
- protected void deleteVertex(Vertex instanceVertex, DataTypes.TypeCategory typeCategory) throws AtlasException {
+ protected void deleteVertex(Vertex instanceVertex, boolean force) throws AtlasException {
//Update external references(incoming edges) to this vertex
LOG.debug("Setting the external references to {} to null(removing edges)", string(instanceVertex));
Iterator<Edge> edges = instanceVertex.getEdges(Direction.IN).iterator();
while(edges.hasNext()) {
Edge edge = edges.next();
- String edgeState = edge.getProperty(Constants.STATE_PROPERTY_KEY);
- if (Id.EntityState.ACTIVE.name().equals(edgeState)) {
+ Id.EntityState edgeState = GraphHelper.getState(edge);
+ if (edgeState == Id.EntityState.ACTIVE) {
//Delete only the active edge references
AttributeInfo attribute = getAttributeForEdge(edge.getLabel());
+ //TODO use delete edge instead??
deleteEdgeBetweenVertices(edge.getVertex(Direction.OUT), edge.getVertex(Direction.IN), attribute.name);
- deleteEdge(edge);
}
}
- _deleteVertex(instanceVertex);
+ _deleteVertex(instanceVertex, force);
}
- protected abstract void _deleteVertex(Vertex instanceVertex);
+ protected abstract void _deleteVertex(Vertex instanceVertex, boolean force);
/**
* Deletes the edge between outvertex and inVertex. The edge is for attribute attributeName of outVertex
@@ -245,7 +257,8 @@ public abstract class DeleteHandler {
attributeName);
String typeName = GraphHelper.getTypeName(outVertex);
String outId = GraphHelper.getIdFromVertex(outVertex);
- if (outId != null && RequestContext.get().isDeletedEntity(outId)) {
+ Id.EntityState state = GraphHelper.getState(outVertex);
+ if ((outId != null && RequestContext.get().isDeletedEntity(outId)) || state == Id.EntityState.DELETED) {
//If the reference vertex is marked for deletion, skip updating the reference
return;
}
@@ -261,8 +274,10 @@ public abstract class DeleteHandler {
//If its class attribute, its the only edge between two vertices
if (attributeInfo.multiplicity.nullAllowed()) {
edge = GraphHelper.getEdgeForLabel(outVertex, edgeLabel);
- }
- else {
+ if (shouldUpdateReverseAttribute) {
+ GraphHelper.setProperty(outVertex, propertyName, null);
+ }
+ } else {
// Cannot unset a required attribute.
throw new NullRequiredAttributeException("Cannot unset required attribute " + GraphHelper.getQualifiedFieldName(type, attributeName) +
" on " + string(outVertex) + " edge = " + edgeLabel);
@@ -275,23 +290,26 @@ public abstract class DeleteHandler {
if (elements != null) {
elements = new ArrayList<>(elements); //Make a copy, else list.remove reflects on titan.getProperty()
for (String elementEdgeId : elements) {
- Edge elementEdge = graphHelper.getEdgeById(elementEdgeId);
+ Edge elementEdge = graphHelper.getEdgeByEdgeId(outVertex, edgeLabel, elementEdgeId);
if (elementEdge == null) {
continue;
}
Vertex elementVertex = elementEdge.getVertex(Direction.IN);
if (elementVertex.getId().toString().equals(inVertex.getId().toString())) {
- if (attributeInfo.multiplicity.nullAllowed() || elements.size() > attributeInfo.multiplicity.lower) {
- edge = elementEdge;
- }
- else {
+ edge = elementEdge;
+
+ //TODO element.size includes deleted items as well. should exclude
+ if (!attributeInfo.multiplicity.nullAllowed()
+ && elements.size() <= attributeInfo.multiplicity.lower) {
// Deleting this edge would violate the attribute's lower bound.
throw new NullRequiredAttributeException(
- "Cannot remove array element from required attribute " +
- GraphHelper.getQualifiedFieldName(type, attributeName) + " on " + string(outVertex) + " " + string(elementEdge));
+ "Cannot remove array element from required attribute " +
+ GraphHelper.getQualifiedFieldName(type, attributeName) + " on "
+ + string(outVertex) + " " + string(elementEdge));
}
- if (shouldUpdateReverseAttribute || attributeInfo.isComposite) {
+
+ if (shouldUpdateReverseAttribute) {
//if composite attribute, remove the reference as well. else, just remove the edge
//for example, when table is deleted, process still references the table
//but when column is deleted, table will not reference the deleted column
@@ -299,8 +317,9 @@ public abstract class DeleteHandler {
attributeName);
elements.remove(elementEdge.getId().toString());
GraphHelper.setProperty(outVertex, propertyName, elements);
+ break;
+
}
- break;
}
}
}
@@ -312,11 +331,12 @@ public abstract class DeleteHandler {
if (keys != null) {
keys = new ArrayList<>(keys); //Make a copy, else list.remove reflects on titan.getProperty()
for (String key : keys) {
- String keyPropertyName = propertyName + "." + key;
+ String keyPropertyName = GraphHelper.getQualifiedNameForMapKey(propertyName, key);
String mapEdgeId = outVertex.getProperty(keyPropertyName);
- Edge mapEdge = graphHelper.getEdgeById(mapEdgeId);
+ Edge mapEdge = graphHelper.getEdgeByEdgeId(outVertex, keyPropertyName, mapEdgeId);
Vertex mapVertex = mapEdge.getVertex(Direction.IN);
if (mapVertex.getId().toString().equals(inVertex.getId().toString())) {
+ //TODO keys.size includes deleted items as well. should exclude
if (attributeInfo.multiplicity.nullAllowed() || keys.size() > attributeInfo.multiplicity.lower) {
edge = mapEdge;
}
@@ -327,7 +347,7 @@ public abstract class DeleteHandler {
GraphHelper.getQualifiedFieldName(type, attributeName) + " on " + string(outVertex) + " " + string(mapEdge));
}
- if (shouldUpdateReverseAttribute || attributeInfo.isComposite) {
+ if (shouldUpdateReverseAttribute) {
//remove this key
LOG.debug("Removing edge {}, key {} from the map attribute {}", string(mapEdge), key,
attributeName);
@@ -351,9 +371,11 @@ public abstract class DeleteHandler {
}
if (edge != null) {
- deleteEdge(edge);
+ deleteEdge(edge, false);
+ RequestContext requestContext = RequestContext.get();
GraphHelper.setProperty(outVertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY,
- RequestContext.get().getRequestTime());
+ requestContext.getRequestTime());
+ requestContext.recordEntityUpdate(outId);
}
}
@@ -389,7 +411,7 @@ public abstract class DeleteHandler {
for (String traitNameToBeDeleted : traitNames) {
String relationshipLabel = GraphHelper.getTraitLabel(typeName, traitNameToBeDeleted);
- deleteReference(instanceVertex, relationshipLabel, DataTypes.TypeCategory.TRAIT);
+ deleteEdgeReference(instanceVertex, relationshipLabel, DataTypes.TypeCategory.TRAIT, false);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
index 3604277..0d82d90 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
@@ -22,8 +22,10 @@ import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.thinkaurelius.titan.core.TitanGraph;
+import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.Vertex;
+import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.GraphTransaction;
import org.apache.atlas.RequestContext;
@@ -40,7 +42,6 @@ import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.IDataType;
import org.apache.atlas.typesystem.types.TypeSystem;
-import org.apache.atlas.typesystem.types.TypeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -258,8 +259,8 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
try {
final String entityTypeName = GraphHelper.getTypeName(instanceVertex);
String relationshipLabel = GraphHelper.getTraitLabel(entityTypeName, traitNameToBeDeleted);
-
- deleteHandler.deleteReference(instanceVertex, relationshipLabel, DataTypes.TypeCategory.TRAIT);
+ Edge edge = GraphHelper.getEdgeForLabel(instanceVertex, relationshipLabel);
+ deleteHandler.deleteEdgeReference(edge, DataTypes.TypeCategory.TRAIT, false, true);
// update the traits in entity once trait removal is successful
traitNames.remove(traitNameToBeDeleted);
@@ -284,14 +285,15 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
- public TypeUtils.Pair<List<String>, List<String>> updateEntities(ITypedReferenceableInstance... entitiesUpdated) throws RepositoryException {
+ public AtlasClient.EntityResult updateEntities(ITypedReferenceableInstance... entitiesUpdated) throws RepositoryException {
LOG.info("updating entity {}", entitiesUpdated);
try {
TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper, deleteHandler);
instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_FULL,
entitiesUpdated);
RequestContext requestContext = RequestContext.get();
- return TypeUtils.Pair.of(requestContext.getCreatedEntityIds(), requestContext.getUpdatedEntityIds());
+ return new AtlasClient.EntityResult(requestContext.getCreatedEntityIds(),
+ requestContext.getUpdatedEntityIds(), requestContext.getDeletedEntityIds());
} catch (AtlasException e) {
throw new RepositoryException(e);
}
@@ -299,13 +301,14 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
- public TypeUtils.Pair<List<String>, List<String>> updatePartial(ITypedReferenceableInstance entity) throws RepositoryException {
+ public AtlasClient.EntityResult updatePartial(ITypedReferenceableInstance entity) throws RepositoryException {
LOG.info("updating entity {}", entity);
try {
TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper, deleteHandler);
instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_PARTIAL, entity);
RequestContext requestContext = RequestContext.get();
- return TypeUtils.Pair.of(requestContext.getCreatedEntityIds(), requestContext.getUpdatedEntityIds());
+ return new AtlasClient.EntityResult(requestContext.getCreatedEntityIds(),
+ requestContext.getUpdatedEntityIds(), requestContext.getDeletedEntityIds());
} catch (AtlasException e) {
throw new RepositoryException(e);
}
@@ -313,7 +316,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
- public TypeUtils.Pair<List<String>, List<ITypedReferenceableInstance>> deleteEntities(List<String> guids) throws RepositoryException {
+ public AtlasClient.EntityResult deleteEntities(List<String> guids) throws RepositoryException {
if (guids == null || guids.size() == 0) {
throw new IllegalArgumentException("guids must be non-null and non-empty");
@@ -337,6 +340,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
}
}
RequestContext requestContext = RequestContext.get();
- return new TypeUtils.Pair<>(requestContext.getDeletedEntityIds(), requestContext.getDeletedEntities());
+ return new AtlasClient.EntityResult(requestContext.getCreatedEntityIds(),
+ requestContext.getUpdatedEntityIds(), requestContext.getDeletedEntityIds());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
index cccafc2..4f6d011 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
@@ -107,11 +107,13 @@ public final class GraphHelper {
// add timestamp information
setProperty(vertexWithoutIdentity, Constants.TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
+ setProperty(vertexWithoutIdentity, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY,
+ RequestContext.get().getRequestTime());
return vertexWithoutIdentity;
}
- public Edge addEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {
+ private Edge addEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {
LOG.debug("Adding edge for {} -> label {} -> {}", string(fromVertex), edgeLabel, string(toVertex));
Edge edge = titanGraph.addEdge(null, fromVertex, toVertex, edgeLabel);
@@ -127,12 +129,34 @@ public final class GraphHelper {
Iterable<Edge> edges = inVertex.getEdges(Direction.IN, edgeLabel);
for (Edge edge : edges) {
if (edge.getVertex(Direction.OUT).getId().toString().equals(outVertex.getId().toString())) {
- return edge;
+ Id.EntityState edgeState = getState(edge);
+ if (edgeState == null || edgeState == Id.EntityState.ACTIVE) {
+ return edge;
+ }
}
}
return addEdge(outVertex, inVertex, edgeLabel);
}
+
+ public Edge getEdgeByEdgeId(Vertex outVertex, String edgeLabel, String edgeId) {
+ if (edgeId == null) {
+ return null;
+ }
+ return titanGraph.getEdge(edgeId);
+
+ //TODO get edge id is expensive. Use this logic. But doesn't work for now
+ /**
+ Iterable<Edge> edges = outVertex.getEdges(Direction.OUT, edgeLabel);
+ for (Edge edge : edges) {
+ if (edge.getId().toString().equals(edgeId)) {
+ return edge;
+ }
+ }
+ return null;
+ **/
+ }
+
/**
* Args of the format prop1, key1, prop2, key2...
* Searches for a vertex with prop1=key1 && prop2=key2
@@ -180,15 +204,14 @@ public final class GraphHelper {
* @return
*/
public static Edge getEdgeForLabel(Vertex vertex, String edgeLabel) {
- String vertexState = vertex.getProperty(Constants.STATE_PROPERTY_KEY);
-
Iterator<Edge> iterator = GraphHelper.getOutGoingEdgesByLabel(vertex, edgeLabel);
Edge latestDeletedEdge = null;
long latestDeletedEdgeTime = Long.MIN_VALUE;
+
while (iterator != null && iterator.hasNext()) {
Edge edge = iterator.next();
- String edgeState = edge.getProperty(Constants.STATE_PROPERTY_KEY);
- if (edgeState == null || Id.EntityState.ACTIVE.name().equals(edgeState)) {
+ Id.EntityState edgeState = getState(edge);
+ if (edgeState == null || edgeState == Id.EntityState.ACTIVE) {
LOG.debug("Found {}", string(edge));
return edge;
} else {
@@ -201,19 +224,8 @@ public final class GraphHelper {
}
//If the vertex is deleted, return latest deleted edge
- if (Id.EntityState.DELETED.equals(vertexState)) {
- LOG.debug("Found {}", string(latestDeletedEdge));
- return latestDeletedEdge;
- }
-
- return null;
- }
-
- public Edge getEdgeById(String edgeId) {
- if(edgeId != null) {
- return titanGraph.getEdge(edgeId);
- }
- return null;
+ LOG.debug("Found {}", latestDeletedEdge == null ? "null" : string(latestDeletedEdge));
+ return latestDeletedEdge;
}
public static String vertexString(final Vertex vertex) {
@@ -343,6 +355,15 @@ public final class GraphHelper {
return instanceVertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY);
}
+ public static Id.EntityState getState(Element element) {
+ String state = getStateAsString(element);
+ return state == null ? null : Id.EntityState.valueOf(state);
+ }
+
+ public static String getStateAsString(Element element) {
+ return element.getProperty(Constants.STATE_PROPERTY_KEY);
+ }
+
/**
* For the given type, finds an unique attribute and checks if there is an existing instance with the same
* unique value