You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/11/12 18:14:12 UTC
[03/42] atlas git commit: ATLAS-2251: Remove TypeSystem and related
implementation, to avoid unncessary duplicate of type details in cache
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/resources/MetadataDiscoveryResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/MetadataDiscoveryResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/MetadataDiscoveryResource.java
index d7c66d3..ff1751d 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/MetadataDiscoveryResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/MetadataDiscoveryResource.java
@@ -22,8 +22,6 @@ import com.google.common.base.Preconditions;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.classification.InterfaceAudience;
-import org.apache.atlas.discovery.DiscoveryException;
-import org.apache.atlas.discovery.DiscoveryService;
import org.apache.atlas.query.QueryParams;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.utils.ParamChecker;
@@ -46,6 +44,7 @@ import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -65,8 +64,6 @@ public class MetadataDiscoveryResource {
private static final String QUERY_TYPE_FULLTEXT = "full-text";
private static final String LIMIT_OFFSET_DEFAULT = "-1";
- private final DiscoveryService discoveryService;
-
private final boolean gremlinSearchEnabled;
private static Configuration applicationProperties = null;
private static final String ENABLE_GREMLIN_SEARCH_PROPERTY = "atlas.search.gremlin.enable";
@@ -75,11 +72,10 @@ public class MetadataDiscoveryResource {
* Created by the Guice ServletModule and injected with the
* configured DiscoveryService.
*
- * @param discoveryService metadata service handle
+ * @param configuration configuration
*/
@Inject
- public MetadataDiscoveryResource(DiscoveryService discoveryService, Configuration configuration) {
- this.discoveryService = discoveryService;
+ public MetadataDiscoveryResource(Configuration configuration) {
applicationProperties = configuration;
gremlinSearchEnabled = applicationProperties != null && applicationProperties.getBoolean(ENABLE_GREMLIN_SEARCH_PROPERTY, false);
}
@@ -152,12 +148,12 @@ public class MetadataDiscoveryResource {
dslQuery = ParamChecker.notEmpty(dslQuery, "dslQuery cannot be null");
QueryParams queryParams = validateQueryParams(limit, offset);
- final String jsonResultStr = discoveryService.searchByDSL(dslQuery, queryParams);
+ final String jsonResultStr = ""; // TODO-typeSystem-removal: discoveryService.searchByDSL(dslQuery, queryParams);
JSONObject response = new DSLJSONResponseBuilder().results(jsonResultStr).query(dslQuery).build();
return Response.ok(response).build();
- } catch (DiscoveryException | IllegalArgumentException e) {
+ } catch (IllegalArgumentException e) {
LOG.error("Unable to get entity list for dslQuery {}", dslQuery, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (WebApplicationException e) {
@@ -220,11 +216,11 @@ public class MetadataDiscoveryResource {
}
if (!gremlinSearchEnabled) {
- throw new DiscoveryException("Gremlin search is not enabled.");
+ throw new Exception("Gremlin search is not enabled.");
}
gremlinQuery = ParamChecker.notEmpty(gremlinQuery, "gremlinQuery cannot be null or empty");
- final List<Map<String, String>> results = discoveryService.searchByGremlin(gremlinQuery);
+ final List<Map<String, String>> results = new ArrayList<>(); // TODO-typeSystem-removal: discoveryService.searchByGremlin(gremlinQuery);
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
@@ -239,7 +235,7 @@ public class MetadataDiscoveryResource {
response.put(AtlasClient.COUNT, list.length());
return Response.ok(response).build();
- } catch (DiscoveryException | IllegalArgumentException e) {
+ } catch (IllegalArgumentException e) {
LOG.error("Unable to get entity list for gremlinQuery {}", gremlinQuery, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (WebApplicationException e) {
@@ -284,12 +280,12 @@ public class MetadataDiscoveryResource {
query = ParamChecker.notEmpty(query, "query cannot be null or empty");
QueryParams queryParams = validateQueryParams(limit, offset);
- final String jsonResultStr = discoveryService.searchByFullText(query, queryParams);
+ final String jsonResultStr = ""; // TODO-typeSystem-removal: discoveryService.searchByFullText(query, queryParams);
JSONArray rowsJsonArr = new JSONArray(jsonResultStr);
JSONObject response = new FullTextJSonResponseBuilder().results(rowsJsonArr).query(query).build();
return Response.ok(response).build();
- } catch (DiscoveryException | IllegalArgumentException e) {
+ } catch (IllegalArgumentException e) {
LOG.error("Unable to get entity list for query {}", query, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (WebApplicationException e) {
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/resources/TypesResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/TypesResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/TypesResource.java
index a9c5509..9b2d7b2 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/TypesResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/TypesResource.java
@@ -23,10 +23,10 @@ import com.sun.jersey.api.core.ResourceContext;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.v1.model.typedef.TypesDef;
import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.typesystem.TypesDef;
-import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.repository.converters.TypeConverterUtil;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.web.rest.TypesREST;
@@ -237,7 +237,7 @@ public class TypesResource {
try {
TypesDef typesDef = TypeConverterUtil.toTypesDef(typeRegistry.getType(typeName), typeRegistry);;
- String typeDefinition = TypesSerialization.toJson(typesDef);
+ String typeDefinition = AtlasType.toV1Json(typesDef);
response.put(AtlasClient.TYPENAME, typeName);
response.put(AtlasClient.DEFINITION, new JSONObject(typeDefinition));
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceState.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceState.java b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceState.java
index c6b4a6f..ea8b738 100644
--- a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceState.java
+++ b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceState.java
@@ -33,10 +33,10 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
-import scala.actors.threadpool.Arrays;
import javax.inject.Inject;
import java.nio.charset.Charset;
+import java.util.Arrays;
import java.util.List;
/**
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java b/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java
index ad2a697..562d9b7 100644
--- a/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java
+++ b/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java
@@ -23,13 +23,11 @@ import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
-import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.repository.Constants;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.typesystem.Struct;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
-import org.apache.atlas.typesystem.types.TypeSystem;
+import org.apache.atlas.v1.model.instance.Struct;
import java.util.ArrayList;
import java.util.HashMap;
@@ -38,21 +36,19 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_PREFIX;
-import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_SUFFIX;
public final class LineageUtils {
private LineageUtils() {}
- private static final String VERTICES_ATTR_NAME = "vertices";
- private static final String EDGES_ATTR_NAME = "edges";
private static final String VERTEX_ID_ATTR_NAME = "vertexId";
private static final String TEMP_STRUCT_ID_RESULT = "__IdType";
private static final AtomicInteger COUNTER = new AtomicInteger();
- public static String toLineageStruct(AtlasLineageInfo lineageInfo, AtlasTypeRegistry registry) throws AtlasBaseException {
- String ret = null;
+ public static Struct toLineageStruct(AtlasLineageInfo lineageInfo, AtlasTypeRegistry registry) throws AtlasBaseException {
+ Struct ret = new Struct();
+
+ ret.setTypeName(Constants.TEMP_STRUCT_NAME_PREFIX + COUNTER.getAndIncrement());
if (lineageInfo != null) {
Map<String, AtlasEntityHeader> entities = lineageInfo.getGuidEntityMap();
@@ -66,11 +62,10 @@ public final class LineageUtils {
if (isDataSet(entityHeader.getTypeName(), registry)) {
Map<String, Object> vertexIdMap = new HashMap<>();
- TypeSystem.IdType idType = TypeSystem.getInstance().getIdType();
- vertexIdMap.put(idType.idAttrName(), guid);
- vertexIdMap.put(idType.stateAttrName(), (entityHeader.getStatus() == AtlasEntity.Status.ACTIVE) ? "ACTIVE" : "DELETED");
- vertexIdMap.put(idType.typeNameAttrName(), entityHeader.getTypeName());
+ vertexIdMap.put(Constants.ATTRIBUTE_NAME_GUID, guid);
+ vertexIdMap.put(Constants.ATTRIBUTE_NAME_STATE, (entityHeader.getStatus() == AtlasEntity.Status.ACTIVE) ? "ACTIVE" : "DELETED");
+ vertexIdMap.put(Constants.ATTRIBUTE_NAME_TYPENAME, entityHeader.getTypeName());
Object qualifiedName = entityHeader.getAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
if (qualifiedName == null) {
@@ -106,11 +101,8 @@ public final class LineageUtils {
}
}
- Map<String, Object> map = new HashMap<>();
- map.put(VERTICES_ATTR_NAME, verticesMap);
- map.put(EDGES_ATTR_NAME, edgesMap);
-
- ret = InstanceSerialization.toJson(constructResultStruct(map, false), false);
+ ret.set("vertices", verticesMap);
+ ret.set("edges", edgesMap);
}
return ret;
@@ -121,7 +113,7 @@ public final class LineageUtils {
return new Struct(TEMP_STRUCT_ID_RESULT, values);
}
- return new Struct(org.apache.atlas.query.TypeUtils.TEMP_STRUCT_NAME_PREFIX() + COUNTER.getAndIncrement(), values);
+ return new Struct(Constants.TEMP_STRUCT_NAME_PREFIX + COUNTER.getAndIncrement(), values);
}
private static boolean isDataSet(String typeName, AtlasTypeRegistry registry) throws AtlasBaseException {
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java b/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java
index 592c2a6..1b5e811 100644
--- a/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java
+++ b/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java
@@ -20,8 +20,8 @@ package org.apache.atlas.examples;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.v1.model.instance.Id;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.web.integration.BaseResourceIT;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java b/webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java
index 3b4ba02..e65d678 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java
@@ -26,7 +26,7 @@ public class AdaptiveWaiterTest {
private final int maxDuration = 100;
private final int minDuration = 5;
- private final int increment = 5;
+ private final int increment = 5;
private NotificationHookConsumer.AdaptiveWaiter waiter;
@BeforeClass
@@ -36,11 +36,13 @@ public class AdaptiveWaiterTest {
@Test
public void basicTest() {
- for (int i = 0; i < 20; i++) {
+ int pauseCount = 10;
+
+ for (int i = 0; i < pauseCount; i++) {
waiter.pause(new IllegalStateException());
}
- assertEquals(waiter.waitDuration, 95);
+ assertEquals(waiter.waitDuration, Math.min((pauseCount + 1) * minDuration, maxDuration)); // waiter.waitDuration will be set to wait time for next pause()
}
@Test
@@ -63,6 +65,6 @@ public class AdaptiveWaiterTest {
}
waiter.pause(new IllegalArgumentException());
- assertEquals(waiter.waitDuration, 5);
+ assertEquals(waiter.waitDuration, minDuration);
}
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
index 7e94330..486b30b 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
@@ -18,25 +18,21 @@
package org.apache.atlas.notification;
-import com.google.common.collect.ImmutableSet;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.kafka.NotificationProvider;
-import org.apache.atlas.notification.entity.EntityNotification;
-import org.apache.atlas.typesystem.IReferenceableInstance;
-import org.apache.atlas.typesystem.IStruct;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.Struct;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
-import org.apache.atlas.typesystem.json.TypesSerialization$;
-import org.apache.atlas.typesystem.persistence.Id;
-import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
-import org.apache.atlas.typesystem.types.TraitType;
-import org.apache.atlas.typesystem.types.utils.TypesUtil;
+import org.apache.atlas.notification.NotificationInterface.NotificationType;
+import org.apache.atlas.v1.model.instance.Id;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.instance.Struct;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1.OperationType;
+import org.apache.atlas.v1.model.typedef.*;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.v1.typesystem.types.utils.TypesUtil;
import org.apache.atlas.web.integration.BaseResourceIT;
import org.testng.annotations.BeforeClass;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
+
+import java.util.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -46,33 +42,35 @@ import static org.testng.Assert.assertTrue;
* Entity Notification Integration Tests.
*/
public class EntityNotificationIT extends BaseResourceIT {
-
- private final String DATABASE_NAME = "db" + randomString();
- private final String TABLE_NAME = "table" + randomString();
- private NotificationInterface notificationInterface = NotificationProvider.get();
- private Id tableId;
- private Id dbId;
- private String traitName;
- private NotificationConsumer notificationConsumer;
+ private final String DATABASE_NAME = "db" + randomString();
+ private final String TABLE_NAME = "table" + randomString();
+ private final NotificationInterface notificationInterface = NotificationProvider.get();
+ private Id tableId;
+ private Id dbId;
+ private String traitName;
+ private NotificationConsumer notificationConsumer;
@BeforeClass
public void setUp() throws Exception {
super.setUp();
+
createTypeDefinitionsV1();
+
Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(DATABASE_NAME);
+
dbId = createInstance(HiveDBInstance);
- notificationConsumer = notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1).get(0);
+ notificationConsumer = notificationInterface.createConsumers(NotificationType.ENTITIES, 1).get(0);
}
public void testCreateEntity() throws Exception {
Referenceable tableInstance = createHiveTableInstanceBuiltIn(DATABASE_NAME, TABLE_NAME, dbId);
+
tableId = createInstance(tableInstance);
final String guid = tableId._getId();
- waitForNotification(notificationConsumer, MAX_WAIT_TIME,
- newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid));
+ waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid));
}
public void testUpdateEntity() throws Exception {
@@ -83,83 +81,83 @@ public class EntityNotificationIT extends BaseResourceIT {
atlasClientV1.updateEntityAttribute(guid, property, newValue);
- waitForNotification(notificationConsumer, MAX_WAIT_TIME,
- newNotificationPredicate(EntityNotification.OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE_BUILTIN, guid));
+ waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE_BUILTIN, guid));
}
public void testDeleteEntity() throws Exception {
- final String tableName = "table-" + randomString();
- final String dbName = "db-" + randomString();
- Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(dbName);
- Id dbId = createInstance(HiveDBInstance);
+ final String tableName = "table-" + randomString();
+ final String dbName = "db-" + randomString();
+ final Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(dbName);
+ final Id dbId = createInstance(HiveDBInstance);
+ final Referenceable tableInstance = createHiveTableInstanceBuiltIn(dbName, tableName, dbId);
+ final Id tableId = createInstance(tableInstance);
+ final String guid = tableId._getId();
- Referenceable tableInstance = createHiveTableInstanceBuiltIn(dbName, tableName, dbId);
- final Id tableId = createInstance(tableInstance);
- final String guid = tableId._getId();
-
- waitForNotification(notificationConsumer, MAX_WAIT_TIME,
- newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid));
+ waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid));
final String name = (String) tableInstance.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
atlasClientV1.deleteEntity(HIVE_TABLE_TYPE_BUILTIN, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
- waitForNotification(notificationConsumer, MAX_WAIT_TIME,
- newNotificationPredicate(EntityNotification.OperationType.ENTITY_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid));
+ waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid));
}
public void testAddTrait() throws Exception {
String superSuperTraitName = "SuperTrait" + randomString();
- createTrait(superSuperTraitName);
-
- String superTraitName = "SuperTrait" + randomString();
- createTrait(superTraitName, superSuperTraitName);
+ String superTraitName = "SuperTrait" + randomString();
traitName = "Trait" + randomString();
+
+ createTrait(superSuperTraitName);
+ createTrait(superTraitName, superSuperTraitName);
createTrait(traitName, superTraitName);
- Struct traitInstance = new Struct(traitName);
- String traitInstanceJSON = InstanceSerialization.toJson(traitInstance, true);
+ Struct traitInstance = new Struct(traitName);
+ String traitInstanceJSON = AtlasType.toV1Json(traitInstance);
+
LOG.debug("Trait instance = {}", traitInstanceJSON);
final String guid = tableId._getId();
atlasClientV1.addTrait(guid, traitInstance);
- EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
- newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid));
+ EntityNotificationV1 entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid));
+
+ Referenceable entity = entityNotification.getEntity();
- IReferenceableInstance entity = entityNotification.getEntity();
- assertTrue(entity.getTraits().contains(traitName));
+ assertTrue(entity.getTraitNames().contains(traitName));
- List<IStruct> allTraits = entityNotification.getAllTraits();
+ List<Struct> allTraits = entityNotification.getAllTraits();
List<String> allTraitNames = new LinkedList<>();
- for (IStruct struct : allTraits) {
+ for (Struct struct : allTraits) {
allTraitNames.add(struct.getTypeName());
}
+
assertTrue(allTraitNames.contains(traitName));
assertTrue(allTraitNames.contains(superTraitName));
assertTrue(allTraitNames.contains(superSuperTraitName));
String anotherTraitName = "Trait" + randomString();
+
createTrait(anotherTraitName, superTraitName);
- traitInstance = new Struct(anotherTraitName);
- traitInstanceJSON = InstanceSerialization.toJson(traitInstance, true);
+ traitInstance = new Struct(anotherTraitName);
+ traitInstanceJSON = AtlasType.toV1Json(traitInstance);
+
LOG.debug("Trait instance = {}", traitInstanceJSON);
atlasClientV1.addTrait(guid, traitInstance);
- entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
- newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid));
+ entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid));
- allTraits = entityNotification.getAllTraits();
+ allTraits = entityNotification.getAllTraits();
allTraitNames = new LinkedList<>();
- for (IStruct struct : allTraits) {
+ for (Struct struct : allTraits) {
allTraitNames.add(struct.getTypeName());
}
+
assertTrue(allTraitNames.contains(traitName));
assertTrue(allTraitNames.contains(anotherTraitName));
// verify that the super type shows up twice in all traits
@@ -171,21 +169,25 @@ public class EntityNotificationIT extends BaseResourceIT {
atlasClientV1.deleteTrait(guid, traitName);
- EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
- newNotificationPredicate(EntityNotification.OperationType.TRAIT_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid));
+ EntityNotificationV1 entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
+ newNotificationPredicate(EntityNotificationV1.OperationType.TRAIT_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid));
- assertFalse(entityNotification.getEntity().getTraits().contains(traitName));
+ assertFalse(entityNotification.getEntity().getTraitNames().contains(traitName));
}
// ----- helper methods ---------------------------------------------------
private void createTrait(String traitName, String ... superTraitNames) throws Exception {
- HierarchicalTypeDefinition<TraitType> trait =
- TypesUtil.createTraitTypeDef(traitName, ImmutableSet.copyOf(superTraitNames));
+ TraitTypeDefinition traitDef = TypesUtil.createTraitTypeDef(traitName, null, new HashSet<>(Arrays.asList(superTraitNames)));
+ TypesDef typesDef = new TypesDef(Collections.<EnumTypeDefinition>emptyList(),
+ Collections.<StructTypeDefinition>emptyList(),
+ Collections.singletonList(traitDef),
+ Collections.<ClassTypeDefinition>emptyList());
+ String traitDefinitionJSON = AtlasType.toV1Json(typesDef);
- String traitDefinitionJSON = TypesSerialization$.MODULE$.toJson(trait, true);
LOG.debug("Trait definition = {}", traitDefinitionJSON);
+
createType(traitDefinitionJSON);
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/test/java/org/apache/atlas/notification/NotificationEntityChangeListenerTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationEntityChangeListenerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationEntityChangeListenerTest.java
index a988915..084ebb1 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationEntityChangeListenerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationEntityChangeListenerTest.java
@@ -18,11 +18,10 @@
package org.apache.atlas.notification;
-import org.apache.atlas.typesystem.IStruct;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.Struct;
-import org.apache.atlas.typesystem.types.TraitType;
-import org.apache.atlas.typesystem.types.TypeSystem;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.instance.Struct;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasTypeRegistry;
import org.testng.annotations.Test;
import java.util.Collections;
@@ -41,45 +40,45 @@ public class NotificationEntityChangeListenerTest {
@Test
public void testGetAllTraitsSuperTraits() throws Exception {
- TypeSystem typeSystem = mock(TypeSystem.class);
+ AtlasTypeRegistry typeSystem = mock(AtlasTypeRegistry.class);
String traitName = "MyTrait";
- IStruct myTrait = new Struct(traitName);
+ Struct myTrait = new Struct(traitName);
String superTraitName = "MySuperTrait";
- TraitType traitDef = mock(TraitType.class);
+ AtlasClassificationType traitDef = mock(AtlasClassificationType.class);
Set<String> superTypeNames = Collections.singleton(superTraitName);
- TraitType superTraitDef = mock(TraitType.class);
+ AtlasClassificationType superTraitDef = mock(AtlasClassificationType.class);
Set<String> superSuperTypeNames = Collections.emptySet();
Referenceable entity = getEntity("id", myTrait);
- when(typeSystem.getDataType(TraitType.class, traitName)).thenReturn(traitDef);
- when(typeSystem.getDataType(TraitType.class, superTraitName)).thenReturn(superTraitDef);
+ when(typeSystem.getClassificationTypeByName(traitName)).thenReturn(traitDef);
+ when(typeSystem.getClassificationTypeByName(superTraitName)).thenReturn(superTraitDef);
- when(traitDef.getAllSuperTypeNames()).thenReturn(superTypeNames);
- when(superTraitDef.getAllSuperTypeNames()).thenReturn(superSuperTypeNames);
+ when(traitDef.getAllSuperTypes()).thenReturn(superTypeNames);
+ when(superTraitDef.getAllSuperTypes()).thenReturn(superSuperTypeNames);
- List<IStruct> allTraits = NotificationEntityChangeListener.getAllTraits(entity, typeSystem);
+ List<Struct> allTraits = NotificationEntityChangeListener.getAllTraits(entity, typeSystem);
assertEquals(2, allTraits.size());
- for (IStruct trait : allTraits) {
+ for (Struct trait : allTraits) {
String typeName = trait.getTypeName();
assertTrue(typeName.equals(traitName) || typeName.equals(superTraitName));
}
}
- private Referenceable getEntity(String id, IStruct... traits) {
+ private Referenceable getEntity(String id, Struct... traits) {
String typeName = "typeName";
Map<String, Object> values = new HashMap<>();
List<String> traitNames = new LinkedList<>();
- Map<String, IStruct> traitMap = new HashMap<>();
+ Map<String, Struct> traitMap = new HashMap<>();
- for (IStruct trait : traits) {
+ for (Struct trait : traits) {
String traitName = trait.getTypeName();
traitNames.add(traitName);
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
index d41db3e..f248593 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
@@ -20,14 +20,13 @@ package org.apache.atlas.notification;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.kafka.NotificationProvider;
-import org.apache.atlas.notification.hook.HookNotification;
-import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
-import org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest;
-import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
-import org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest;
-import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.v1.model.instance.Id;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityPartialUpdateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest;
import org.apache.atlas.web.integration.BaseResourceIT;
import org.codehaus.jettison.json.JSONArray;
import org.testng.annotations.AfterClass;
@@ -40,18 +39,19 @@ import static java.lang.Thread.sleep;
import static org.testng.Assert.assertEquals;
public class NotificationHookConsumerIT extends BaseResourceIT {
-
private static final String TEST_USER = "testuser";
- public static final String NAME = "name";
- public static final String DESCRIPTION = "description";
+
+ public static final String NAME = "name";
+ public static final String DESCRIPTION = "description";
public static final String QUALIFIED_NAME = "qualifiedName";
- public static final String CLUSTER_NAME = "clusterName";
+ public static final String CLUSTER_NAME = "clusterName";
- private NotificationInterface notificationInterface = NotificationProvider.get();
+ private final NotificationInterface notificationInterface = NotificationProvider.get();
@BeforeClass
public void setUp() throws Exception {
super.setUp();
+
createTypeDefinitionsV1();
}
@@ -60,29 +60,33 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
notificationInterface.close();
}
- private void sendHookMessage(HookNotificationMessage message) throws NotificationException, InterruptedException {
+ private void sendHookMessage(HookNotification message) throws NotificationException, InterruptedException {
notificationInterface.send(NotificationInterface.NotificationType.HOOK, message);
+
sleep(1000);
}
@Test
public void testMessageHandleFailureConsumerContinues() throws Exception {
//send invalid message - update with invalid type
- sendHookMessage(new HookNotification.EntityPartialUpdateRequest(TEST_USER, randomString(), null, null,
- new Referenceable(randomString())));
+ sendHookMessage(new EntityPartialUpdateRequest(TEST_USER, randomString(), null, null, new Referenceable(randomString())));
//send valid message
final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
- String dbName = "db" + randomString();
+ final String dbName = "db" + randomString();
+
entity.set(NAME, dbName);
entity.set(DESCRIPTION, randomString());
entity.set(QUALIFIED_NAME, dbName);
entity.set(CLUSTER_NAME, randomString());
+
sendHookMessage(new EntityCreateRequest(TEST_USER, entity));
+
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE_BUILTIN, entity.get(NAME)));
+
return results.length() == 1;
}
});
@@ -91,24 +95,28 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
@Test
public void testCreateEntity() throws Exception {
final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
- String dbName = "db" + randomString();
+ final String dbName = "db" + randomString();
+
entity.set(NAME, dbName);
entity.set(DESCRIPTION, randomString());
entity.set(QUALIFIED_NAME, dbName);
entity.set(CLUSTER_NAME, randomString());
sendHookMessage(new EntityCreateRequest(TEST_USER, entity));
+
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, entity.get(QUALIFIED_NAME)));
+
return results.length() == 1;
}
});
//Assert that user passed in hook message is used in audit
- Referenceable instance = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, (String) entity.get(QUALIFIED_NAME));
- List<EntityAuditEvent> events = atlasClientV1.getEntityAuditEvents(instance.getId()._getId(), (short) 1);
+ Referenceable instance = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, (String) entity.get(QUALIFIED_NAME));
+ List<EntityAuditEvent> events = atlasClientV1.getEntityAuditEvents(instance.getId()._getId(), (short) 1);
+
assertEquals(events.size(), 1);
assertEquals(events.get(0).getUser(), TEST_USER);
}
@@ -116,7 +124,8 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
@Test
public void testUpdateEntityPartial() throws Exception {
final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
- final String dbName = "db" + randomString();
+ final String dbName = "db" + randomString();
+
entity.set(NAME, dbName);
entity.set(DESCRIPTION, randomString());
entity.set(QUALIFIED_NAME, dbName);
@@ -125,25 +134,31 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
atlasClientV1.createEntity(entity);
final Referenceable newEntity = new Referenceable(DATABASE_TYPE_BUILTIN);
+
newEntity.set("owner", randomString());
+
sendHookMessage(new EntityPartialUpdateRequest(TEST_USER, DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName, newEntity));
+
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
Referenceable localEntity = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName);
+
return (localEntity.get("owner") != null && localEntity.get("owner").equals(newEntity.get("owner")));
}
});
//Its partial update and un-set fields are not updated
Referenceable actualEntity = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName);
+
assertEquals(actualEntity.get(DESCRIPTION), entity.get(DESCRIPTION));
}
@Test
public void testUpdatePartialUpdatingQualifiedName() throws Exception {
final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
- final String dbName = "db" + randomString();
+ final String dbName = "db" + randomString();
+
entity.set(NAME, dbName);
entity.set(DESCRIPTION, randomString());
entity.set(QUALIFIED_NAME, dbName);
@@ -152,28 +167,32 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
atlasClientV1.createEntity(entity);
final Referenceable newEntity = new Referenceable(DATABASE_TYPE_BUILTIN);
- final String newName = "db" + randomString();
+ final String newName = "db" + randomString();
+
newEntity.set(QUALIFIED_NAME, newName);
sendHookMessage(new EntityPartialUpdateRequest(TEST_USER, DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName, newEntity));
+
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, newName));
+
return results.length() == 1;
}
});
//no entity with the old qualified name
JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, dbName));
- assertEquals(results.length(), 0);
+ assertEquals(results.length(), 0);
}
@Test
public void testDeleteByQualifiedName() throws Exception {
- Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
- final String dbName = "db" + randomString();
+ final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
+ final String dbName = "db" + randomString();
+
entity.set(NAME, dbName);
entity.set(DESCRIPTION, randomString());
entity.set(QUALIFIED_NAME, dbName);
@@ -182,10 +201,12 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
final String dbId = atlasClientV1.createEntity(entity).get(0);
sendHookMessage(new EntityDeleteRequest(TEST_USER, DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName));
+
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
Referenceable getEntity = atlasClientV1.getEntity(dbId);
+
return getEntity.getId().getState() == Id.EntityState.DELETED;
}
});
@@ -193,8 +214,9 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
@Test
public void testUpdateEntityFullUpdate() throws Exception {
- Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
- final String dbName = "db" + randomString();
+ final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
+ final String dbName = "db" + randomString();
+
entity.set(NAME, dbName);
entity.set(DESCRIPTION, randomString());
entity.set(QUALIFIED_NAME, dbName);
@@ -203,6 +225,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
atlasClientV1.createEntity(entity);
final Referenceable newEntity = new Referenceable(DATABASE_TYPE_BUILTIN);
+
newEntity.set(NAME, randomString());
newEntity.set(DESCRIPTION, randomString());
newEntity.set("owner", randomString());
@@ -211,18 +234,19 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
//updating unique attribute
sendHookMessage(new EntityUpdateRequest(TEST_USER, newEntity));
+
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, newEntity.get(QUALIFIED_NAME)));
+
return results.length() == 1;
}
});
Referenceable actualEntity = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName);
+
assertEquals(actualEntity.get(DESCRIPTION), newEntity.get(DESCRIPTION));
assertEquals(actualEntity.get("owner"), newEntity.get("owner"));
}
-
-
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index eb37fa8..4ea13c7 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -25,14 +25,15 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.kafka.NotificationProvider;
-import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.notification.HookNotificationV1;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v1.EntityStream;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.lang.RandomStringUtils;
import org.mockito.Mock;
@@ -41,7 +42,7 @@ import org.testng.Assert;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
-import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+
import java.util.List;
import org.apache.atlas.kafka.AtlasKafkaConsumer;
@@ -57,11 +58,11 @@ import static org.testng.Assert.*;
public class NotificationHookConsumerKafkaTest {
-
- public static final String NAME = "name";
- public static final String DESCRIPTION = "description";
+ public static final String NAME = "name";
+ public static final String DESCRIPTION = "description";
public static final String QUALIFIED_NAME = "qualifiedName";
- private NotificationInterface notificationInterface = NotificationProvider.get();
+
+ private final NotificationInterface notificationInterface = NotificationProvider.get();
@Mock
@@ -81,10 +82,14 @@ public class NotificationHookConsumerKafkaTest {
@BeforeTest
public void setup() throws AtlasException, InterruptedException, AtlasBaseException {
MockitoAnnotations.initMocks(this);
- AtlasType mockType = mock(AtlasType.class);
+
+ AtlasType mockType = mock(AtlasType.class);
+ AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntitiesWithExtInfo.class);
+
when(typeRegistry.getType(anyString())).thenReturn(mockType);
- AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class);
+
when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);
+
kafkaNotification = startKafkaServer();
}
@@ -97,19 +102,20 @@ public class NotificationHookConsumerKafkaTest {
@Test
public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException, AtlasBaseException {
try {
- produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
+ produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
- NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, false);
- NotificationHookConsumer notificationHookConsumer =
- new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
- NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
+ NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, false);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer);
+
verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
// produce another message, and make sure it moves ahead. If commit succeeded, this would work.
- produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity()));
+ produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity()));
consumeOneMessage(consumer, hookConsumer);
+
verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
reset(atlasEntityStore);
}
@@ -121,22 +127,20 @@ public class NotificationHookConsumerKafkaTest {
@Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled")
public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception {
try {
- produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity()));
+ produceMessage(new HookNotificationV1.EntityCreateRequest("test_user3", createEntity()));
- NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, true);
+ NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, true);
assertNotNull (consumer);
- NotificationHookConsumer notificationHookConsumer =
- new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
- NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
-
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer);
verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
// produce another message, but this will not be consumed, as commit code is not executed in hook consumer.
- produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity()));
+ produceMessage(new HookNotificationV1.EntityCreateRequest("test_user4", createEntity()));
consumeOneMessage(consumer, hookConsumer);
verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
@@ -146,18 +150,19 @@ public class NotificationHookConsumerKafkaTest {
}
}
- AtlasKafkaConsumer<HookNotificationMessage> createNewConsumer(KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
+ AtlasKafkaConsumer<HookNotification> createNewConsumer(KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
return (AtlasKafkaConsumer) kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0);
}
- void consumeOneMessage(NotificationConsumer<HookNotificationMessage> consumer,
+ void consumeOneMessage(NotificationConsumer<HookNotification> consumer,
NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException {
try {
long startTime = System.currentTimeMillis(); //fetch starting time
+
while ((System.currentTimeMillis() - startTime) < 10000) {
- List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive();
+ List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive();
- for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
+ for (AtlasKafkaMessage<HookNotification> msg : messages) {
hookConsumer.handleMessage(msg);
}
@@ -172,19 +177,25 @@ public class NotificationHookConsumerKafkaTest {
Referenceable createEntity() {
final Referenceable entity = new Referenceable(AtlasClient.DATA_SET_SUPER_TYPE);
+
entity.set(NAME, "db" + randomString());
entity.set(DESCRIPTION, randomString());
entity.set(QUALIFIED_NAME, randomString());
+
return entity;
}
KafkaNotification startKafkaServer() throws AtlasException, InterruptedException {
Configuration applicationProperties = ApplicationProperties.get();
+
applicationProperties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5));
kafkaNotification = new KafkaNotification(applicationProperties);
+
kafkaNotification.start();
+
Thread.sleep(2000);
+
return kafkaNotification;
}
@@ -192,8 +203,7 @@ public class NotificationHookConsumerKafkaTest {
return RandomStringUtils.randomAlphanumeric(10);
}
- private void produceMessage(HookNotificationMessage message) throws NotificationException {
+ private void produceMessage(HookNotification message) throws NotificationException {
kafkaNotification.send(NotificationInterface.NotificationType.HOOK, message);
}
-
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index e2d1022..f8bd9a1 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -22,15 +22,17 @@ import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.kafka.AtlasKafkaMessage;
-import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
+import org.apache.atlas.notification.NotificationInterface.NotificationType;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v1.EntityStream;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.configuration.Configuration;
import org.apache.kafka.common.TopicPartition;
@@ -43,6 +45,7 @@ import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -75,20 +78,24 @@ public class NotificationHookConsumerTest {
@BeforeMethod
public void setup() throws AtlasBaseException {
MockitoAnnotations.initMocks(this);
- AtlasType mockType = mock(AtlasType.class);
+
+ AtlasType mockType = mock(AtlasType.class);
+ AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntitiesWithExtInfo.class);
+
when(typeRegistry.getType(anyString())).thenReturn(mockType);
- AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class);
when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);
+
EntityMutationResponse mutationResponse = mock(EntityMutationResponse.class);
+
when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenReturn(mutationResponse);
}
@Test
public void testConsumerCanProceedIfServerIsReady() throws Exception {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
- NotificationHookConsumer.HookConsumer hookConsumer =
- notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
- NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
+ NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
assertTrue(hookConsumer.serverAvailable(timer));
@@ -98,10 +105,9 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
- NotificationHookConsumer.HookConsumer hookConsumer =
- notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
- NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
+ NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
when(serviceState.getState())
.thenReturn(ServiceState.ServiceStateValue.PASSIVE)
@@ -116,35 +122,30 @@ public class NotificationHookConsumerTest {
@Test
public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException {
- NotificationHookConsumer notificationHookConsumer =
- new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
- NotificationConsumer consumer = mock(NotificationConsumer.class);
- NotificationHookConsumer.HookConsumer hookConsumer =
- notificationHookConsumer.new HookConsumer(consumer);
- HookNotification.EntityCreateRequest message = mock(HookNotification.EntityCreateRequest.class);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationConsumer consumer = mock(NotificationConsumer.class);
+ NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
+ EntityCreateRequest message = mock(EntityCreateRequest.class);
+ Referenceable mock = mock(Referenceable.class);
+
when(message.getUser()).thenReturn("user");
- when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE);
- Referenceable mock = mock(Referenceable.class);
+ when(message.getType()).thenReturn(HookNotificationType.ENTITY_CREATE);
when(message.getEntities()).thenReturn(Arrays.asList(mock));
hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
+
verify(consumer).commit(any(TopicPartition.class), anyInt());
}
@Test
public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException {
- NotificationHookConsumer notificationHookConsumer =
- new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
- NotificationConsumer consumer = mock(NotificationConsumer.class);
- NotificationHookConsumer.HookConsumer hookConsumer =
- notificationHookConsumer.new HookConsumer(consumer);
- HookNotification.EntityCreateRequest message = new HookNotification.EntityCreateRequest("user",
- new ArrayList<Referenceable>() {
- {
- add(mock(Referenceable.class));
- }
- });
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationConsumer consumer = mock(NotificationConsumer.class);
+ NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
+ EntityCreateRequest message = new EntityCreateRequest("user", Collections.singletonList(mock(Referenceable.class)));
+
when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenThrow(new RuntimeException("Simulating exception in processing message"));
+
hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
verifyZeroInteractions(consumer);
@@ -152,10 +153,10 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerProceedsWithFalseIfInterrupted() throws Exception {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
- NotificationHookConsumer.HookConsumer hookConsumer =
- notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
- NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
+ NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+
doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
@@ -164,58 +165,75 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumersStartedIfHAIsDisabled() throws Exception {
+ List<NotificationConsumer<Object>> consumers = new ArrayList();
+ NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
+
+ consumers.add(notificationConsumerMock);
+
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
- List<NotificationConsumer<Object>> consumers = new ArrayList();
- consumers.add(mock(NotificationConsumer.class));
- when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
- thenReturn(consumers);
+ when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
+
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+
notificationHookConsumer.startInternal(configuration, executorService);
- verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
+
+ verify(notificationInterface).createConsumers(NotificationType.HOOK, 1);
verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
}
@Test
public void testConsumersAreNotStartedIfHAIsEnabled() throws Exception {
+ List<NotificationConsumer<Object>> consumers = new ArrayList();
+ NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
+
+ consumers.add(notificationConsumerMock);
+
when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
- List<NotificationConsumer<Object>> consumers = new ArrayList();
- consumers.add(mock(NotificationConsumer.class));
- when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
- thenReturn(consumers);
+ when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
+
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+
notificationHookConsumer.startInternal(configuration, executorService);
+
verifyZeroInteractions(notificationInterface);
}
@Test
public void testConsumersAreStartedWhenInstanceBecomesActive() throws Exception {
+ List<NotificationConsumer<Object>> consumers = new ArrayList();
+ NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
+
+ consumers.add(notificationConsumerMock);
+
when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
- List<NotificationConsumer<Object>> consumers = new ArrayList();
- consumers.add(mock(NotificationConsumer.class));
- when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
- thenReturn(consumers);
+ when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
+
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+
notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsActive();
- verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
+
+ verify(notificationInterface).createConsumers(NotificationType.HOOK, 1);
verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
}
@Test
public void testConsumersAreStoppedWhenInstanceBecomesPassive() throws Exception {
+ List<NotificationConsumer<Object>> consumers = new ArrayList();
+ NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
+
+ consumers.add(notificationConsumerMock);
+
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
- List<NotificationConsumer<Object>> consumers = new ArrayList();
- NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
- consumers.add(notificationConsumerMock);
+ when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
- when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers);
final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
doAnswer(new Answer() {
@@ -223,12 +241,14 @@ public class NotificationHookConsumerTest {
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
notificationHookConsumer.consumers.get(0).start();
Thread.sleep(500);
+
return null;
}
}).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsPassive();
+
verify(notificationInterface).close();
verify(executorService).shutdown();
verify(notificationConsumerMock).wakeup();
@@ -236,18 +256,21 @@ public class NotificationHookConsumerTest {
@Test
public void consumersStoppedBeforeStarting() throws Exception {
+ List<NotificationConsumer<Object>> consumers = new ArrayList();
+ NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
+
+ consumers.add(notificationConsumerMock);
+
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
- List<NotificationConsumer<Object>> consumers = new ArrayList();
- NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
- consumers.add(notificationConsumerMock);
+ when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
- when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers);
final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsPassive();
+
verify(notificationInterface).close();
verify(executorService).shutdown();
}
@@ -261,13 +284,16 @@ public class NotificationHookConsumerTest {
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
notificationHookConsumer.consumers.get(0).start();
Thread.sleep(1000);
+
return null;
}
}).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
notificationHookConsumer.startInternal(configuration, executorService);
Thread.sleep(1000);
+
assertTrue(notificationHookConsumer.consumers.get(0).isAlive());
+
notificationHookConsumer.consumers.get(0).shutdown();
}
@@ -280,27 +306,32 @@ public class NotificationHookConsumerTest {
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
notificationHookConsumer.consumers.get(0).start();
Thread.sleep(500);
+
return null;
}
}).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
notificationHookConsumer.startInternal(configuration, executorService);
Thread.sleep(500);
+
notificationHookConsumer.consumers.get(0).shutdown();
Thread.sleep(500);
+
assertFalse(notificationHookConsumer.consumers.get(0).isAlive());
}
private NotificationHookConsumer setupNotificationHookConsumer() throws AtlasException {
+ List<NotificationConsumer<Object>> consumers = new ArrayList();
+ NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
+
+ consumers.add(notificationConsumerMock);
+
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
- List<NotificationConsumer<Object>> consumers = new ArrayList();
- NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
when(notificationConsumerMock.receive()).thenThrow(new IllegalStateException());
- consumers.add(notificationConsumerMock);
+ when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
- when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers);
return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
}
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/test/java/org/apache/atlas/util/RestUtilsTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/util/RestUtilsTest.java b/webapp/src/test/java/org/apache/atlas/util/RestUtilsTest.java
index 87259df..0d4af1e 100644
--- a/webapp/src/test/java/org/apache/atlas/util/RestUtilsTest.java
+++ b/webapp/src/test/java/org/apache/atlas/util/RestUtilsTest.java
@@ -22,12 +22,10 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
@@ -44,24 +42,12 @@ import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeRegistry.AtlasTransientTypeRegistry;
-import org.apache.atlas.typesystem.TypesDef;
-import org.apache.atlas.typesystem.json.TypesSerialization;
-import org.apache.atlas.typesystem.types.AttributeDefinition;
-import org.apache.atlas.typesystem.types.ClassType;
-import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
-import org.apache.atlas.typesystem.types.EnumTypeDefinition;
-import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
-import org.apache.atlas.typesystem.types.Multiplicity;
-import org.apache.atlas.typesystem.types.StructTypeDefinition;
-import org.apache.atlas.typesystem.types.TraitType;
-import org.apache.atlas.typesystem.types.utils.TypesUtil;
+import org.apache.atlas.v1.model.typedef.*;
+import org.apache.atlas.v1.typesystem.types.utils.TypesUtil;
import org.testng.Assert;
import org.testng.annotations.Test;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-
/**
* Validates that conversion from V1 to legacy types (and back) is consistent. This also tests
* that the conversion logic in AtlasStructDefStoreV1 is consistent with the conversion logic
@@ -76,13 +62,13 @@ public class RestUtilsTest {
// in tables attribute in "database" type is lost. See ATLAS-1528.
public void testBidirectonalCompositeMappingConsistent() throws AtlasBaseException {
- HierarchicalTypeDefinition<ClassType> dbV1Type = TypesUtil.createClassTypeDef("database",
- ImmutableSet.<String> of(), new AttributeDefinition("tables", DataTypes.arrayTypeName("table"),
- Multiplicity.OPTIONAL, true, "containingDatabase"));
+ ClassTypeDefinition dbV1Type = TypesUtil.createClassTypeDef("database", "", Collections.emptySet(),
+ new AttributeDefinition("tables", AtlasBaseTypeDef.getArrayTypeName("table"),
+ Multiplicity.OPTIONAL, true, "containingDatabase"));
- HierarchicalTypeDefinition<ClassType> tableV1Type = TypesUtil.createClassTypeDef("table",
- ImmutableSet.<String> of(),
- new AttributeDefinition("containingDatabase", "database", Multiplicity.OPTIONAL, false, "tables"));
+ ClassTypeDefinition tableV1Type = TypesUtil.createClassTypeDef("table", "", Collections.emptySet(),
+ new AttributeDefinition("containingDatabase", "database",
+ Multiplicity.OPTIONAL, false, "tables"));
testV1toV2toV1Conversion(Arrays.asList(dbV1Type, tableV1Type), new boolean[] { true, false });
}
@@ -92,121 +78,118 @@ public class RestUtilsTest {
// "containingDatabase" is lost
// in "table" attribute in "database". See ATLAS-1528.
public void testBidirectonalNonCompositeMappingConsistent() throws AtlasBaseException {
+ ClassTypeDefinition dbV1Type = TypesUtil.createClassTypeDef("database", "", Collections.emptySet(),
+ new AttributeDefinition("tables", AtlasBaseTypeDef.getArrayTypeName("table"),
+ Multiplicity.OPTIONAL, false, "containingDatabase"));
- HierarchicalTypeDefinition<ClassType> dbV1Type = TypesUtil.createClassTypeDef("database",
- ImmutableSet.<String> of(), new AttributeDefinition("tables", DataTypes.arrayTypeName("table"),
- Multiplicity.OPTIONAL, false, "containingDatabase"));
-
- HierarchicalTypeDefinition<ClassType> tableV1Type = TypesUtil.createClassTypeDef("table",
- ImmutableSet.<String> of(),
- new AttributeDefinition("containingDatabase", "database", Multiplicity.OPTIONAL, false, "tables"));
+ ClassTypeDefinition tableV1Type = TypesUtil.createClassTypeDef("table", "", Collections.emptySet(),
+ new AttributeDefinition("containingDatabase", "database",
+ Multiplicity.OPTIONAL, false, "tables"));
testV1toV2toV1Conversion(Arrays.asList(dbV1Type, tableV1Type), new boolean[] { false, false });
}
private AtlasTypeDefGraphStoreV1 makeTypeStore(AtlasTypeRegistry reg) {
-
AtlasTypeDefGraphStoreV1 result = mock(AtlasTypeDefGraphStoreV1.class);
for (AtlasEntityType type : reg.getAllEntityTypes()) {
- String typeName = type.getTypeName();
+ String typeName = type.getTypeName();
AtlasVertex typeVertex = mock(AtlasVertex.class);
+
when(result.isTypeVertex(eq(typeVertex), any(TypeCategory.class))).thenReturn(true);
- when(typeVertex.getProperty(eq(Constants.TYPE_CATEGORY_PROPERTY_KEY), eq(TypeCategory.class)))
- .thenReturn(TypeCategory.CLASS);
+ when(typeVertex.getProperty(eq(Constants.TYPE_CATEGORY_PROPERTY_KEY), eq(TypeCategory.class))).thenReturn(TypeCategory.CLASS);
String attributeListPropertyKey = AtlasGraphUtilsV1.getTypeDefPropertyKey(typeName);
- when(typeVertex.getProperty(eq(attributeListPropertyKey), eq(List.class)))
- .thenReturn(new ArrayList<>(type.getAllAttributes().keySet()));
+
+ when(typeVertex.getProperty(eq(attributeListPropertyKey), eq(List.class))).thenReturn(new ArrayList<>(type.getAllAttributes().keySet()));
+
for (AtlasAttribute attribute : type.getAllAttributes().values()) {
String attributeDefPropertyKey = AtlasGraphUtilsV1.getTypeDefPropertyKey(typeName, attribute.getName());
- String attributeJson = AtlasStructDefStoreV1.toJsonFromAttribute(attribute);
+ String attributeJson = AtlasStructDefStoreV1.toJsonFromAttribute(attribute);
+
when(typeVertex.getProperty(eq(attributeDefPropertyKey), eq(String.class))).thenReturn(attributeJson);
}
+
when(result.findTypeVertexByName(eq(typeName))).thenReturn(typeVertex);
}
+
return result;
}
- private AtlasAttributeDef convertToJsonAndBack(AtlasTypeRegistry registry, AtlasStructDef structDef,
- AtlasAttributeDef attributeDef, boolean compositeExpected) throws AtlasBaseException {
-
+ private AtlasAttributeDef convertToJsonAndBack(AtlasTypeRegistry registry, AtlasStructDef structDef, AtlasAttributeDef attributeDef, boolean compositeExpected) throws AtlasBaseException {
AtlasTypeDefGraphStoreV1 typeDefStore = makeTypeStore(registry);
- AtlasStructType structType = (AtlasStructType) registry.getType(structDef.getName());
- AtlasAttribute attribute = structType.getAttribute(attributeDef.getName());
- String attribJson = AtlasStructDefStoreV1.toJsonFromAttribute(attribute);
+ AtlasStructType structType = (AtlasStructType) registry.getType(structDef.getName());
+ AtlasAttribute attribute = structType.getAttribute(attributeDef.getName());
+ String attribJson = AtlasStructDefStoreV1.toJsonFromAttribute(attribute);
+ Map attrInfo = AtlasType.fromJson(attribJson, Map.class);
- Map attrInfo = AtlasType.fromJson(attribJson, Map.class);
Assert.assertEquals(attrInfo.get("isComposite"), compositeExpected);
+
return AtlasStructDefStoreV1.toAttributeDefFromJson(structDef, attrInfo, typeDefStore);
}
- private void testV1toV2toV1Conversion(List<HierarchicalTypeDefinition<ClassType>> typesToTest,
- boolean[] compositeExpected) throws AtlasBaseException {
-
+ private void testV1toV2toV1Conversion(List<ClassTypeDefinition> typesToTest, boolean[] compositeExpected) throws AtlasBaseException {
List<AtlasEntityDef> convertedEntityDefs = convertV1toV2(typesToTest);
+ AtlasTypeRegistry registry = createRegistry(convertedEntityDefs);
- AtlasTypeRegistry registry = createRegistry(convertedEntityDefs);
for(int i = 0 ; i < convertedEntityDefs.size(); i++) {
AtlasEntityDef def = convertedEntityDefs.get(i);
+
for (AtlasAttributeDef attrDef : def.getAttributeDefs()) {
AtlasAttributeDef converted = convertToJsonAndBack(registry, def, attrDef, compositeExpected[i]);
+
Assert.assertEquals(converted, attrDef);
}
}
- List<HierarchicalTypeDefinition<ClassType>> convertedBackTypeDefs = convertV2toV1(convertedEntityDefs);
+ List<ClassTypeDefinition> convertedBackTypeDefs = convertV2toV1(convertedEntityDefs);
for (int i = 0; i < typesToTest.size(); i++) {
+ ClassTypeDefinition convertedBack = convertedBackTypeDefs.get(i);
- HierarchicalTypeDefinition<ClassType> convertedBack = convertedBackTypeDefs.get(i);
Assert.assertEquals(convertedBack, typesToTest.get(i));
- AttributeDefinition[] attributeDefinitions = convertedBack.attributeDefinitions;
- if (attributeDefinitions.length > 0) {
- Assert.assertEquals(attributeDefinitions[0].isComposite, compositeExpected[i]);
+
+ List<AttributeDefinition> attributeDefinitions = convertedBack.getAttributeDefinitions();
+
+ if (attributeDefinitions.size() > 0) {
+ Assert.assertEquals(attributeDefinitions.get(0).getIsComposite(), compositeExpected[i]);
}
}
-
}
- private List<HierarchicalTypeDefinition<ClassType>> convertV2toV1(List<AtlasEntityDef> toConvert)
- throws AtlasBaseException {
-
- AtlasTypeRegistry reg = createRegistry(toConvert);
+ private List<ClassTypeDefinition> convertV2toV1(List<AtlasEntityDef> toConvert) throws AtlasBaseException {
+ AtlasTypeRegistry reg = createRegistry(toConvert);
+ List<ClassTypeDefinition> result = new ArrayList<>(toConvert.size());
- List<HierarchicalTypeDefinition<ClassType>> result = new ArrayList<>(toConvert.size());
for (int i = 0; i < toConvert.size(); i++) {
- AtlasEntityDef entityDef = toConvert.get(i);
- AtlasEntityType entity = reg.getEntityTypeByName(entityDef.getName());
- HierarchicalTypeDefinition<ClassType> converted = TypeConverterUtil.toTypesDef(entity, reg)
- .classTypesAsJavaList().get(0);
+ AtlasEntityDef entityDef = toConvert.get(i);
+ AtlasEntityType entity = reg.getEntityTypeByName(entityDef.getName());
+ ClassTypeDefinition converted = TypeConverterUtil.toTypesDef(entity, reg).getClassTypes().get(0);
+
result.add(converted);
}
+
return result;
}
private AtlasTypeRegistry createRegistry(List<AtlasEntityDef> toConvert) throws AtlasBaseException {
- AtlasTypeRegistry reg = new AtlasTypeRegistry();
+ AtlasTypeRegistry reg = new AtlasTypeRegistry();
AtlasTransientTypeRegistry tmp = reg.lockTypeRegistryForUpdate();
+
tmp.addTypes(toConvert);
reg.releaseTypeRegistryForUpdate(tmp, true);
+
return reg;
}
- private List<AtlasEntityDef> convertV1toV2(List<HierarchicalTypeDefinition<ClassType>> types)
- throws AtlasBaseException {
-
- ImmutableList<HierarchicalTypeDefinition<ClassType>> classTypeList = ImmutableList
- .<HierarchicalTypeDefinition<ClassType>> builder().addAll(types).build();
-
- TypesDef toConvert = TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition> of(),
- ImmutableList.<StructTypeDefinition> of(), ImmutableList.<HierarchicalTypeDefinition<TraitType>> of(),
- classTypeList);
+ private List<AtlasEntityDef> convertV1toV2(List<ClassTypeDefinition> types) throws AtlasBaseException {
+ List<ClassTypeDefinition> classTypeList = new ArrayList(types);
+ TypesDef toConvert = new TypesDef(Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), classTypeList);
+ String json = AtlasType.toV1Json(toConvert);
+ AtlasTypeRegistry emptyRegistry = new AtlasTypeRegistry();
+ AtlasTypesDef converted = TypeConverterUtil.toAtlasTypesDef(json, emptyRegistry);
+ List<AtlasEntityDef> convertedEntityDefs = converted.getEntityDefs();
- String json = TypesSerialization.toJson(toConvert);
- AtlasTypeRegistry emptyRegistry = new AtlasTypeRegistry();
- AtlasTypesDef converted = TypeConverterUtil.toAtlasTypesDef(json, emptyRegistry);
- List<AtlasEntityDef> convertedEntityDefs = converted.getEntityDefs();
return convertedEntityDefs;
}
}