You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/11/12 18:14:13 UTC
[04/42] atlas git commit: ATLAS-2251: Remove TypeSystem and related
implementation, to avoid unncessary duplicate of type details in cache
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
index 53acf56..4633de9 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
@@ -21,44 +21,34 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.listener.EntityChangeListener;
-import org.apache.atlas.notification.entity.EntityNotification;
-import org.apache.atlas.notification.entity.EntityNotificationImpl;
+import org.apache.atlas.notification.NotificationInterface.NotificationType;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.instance.Struct;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1.OperationType;
import org.apache.atlas.repository.graph.GraphHelper;
-import org.apache.atlas.typesystem.IReferenceableInstance;
-import org.apache.atlas.typesystem.IStruct;
-import org.apache.atlas.typesystem.ITypedReferenceableInstance;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.Struct;
-import org.apache.atlas.typesystem.types.FieldMapping;
-import org.apache.atlas.typesystem.types.TraitType;
-import org.apache.atlas.typesystem.types.TypeSystem;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
/**
* Listen to the repository for entity changes and produce entity change notifications.
*/
@Component
public class NotificationEntityChangeListener implements EntityChangeListener {
+ private static final String ATLAS_ENTITY_NOTIFICATION_PROPERTY = "atlas.notification.entity";
- private final NotificationInterface notificationInterface;
- private final TypeSystem typeSystem;
+ private final NotificationInterface notificationInterface;
+ private final AtlasTypeRegistry typeRegistry;
+ private final Map<String, List<String>> notificationAttributesCache = new HashMap<>();
- private Map<String, List<String>> notificationAttributesCache = new HashMap<>();
- private static final String ATLAS_ENTITY_NOTIFICATION_PROPERTY = "atlas.notification.entity";
- static Configuration APPLICATION_PROPERTIES = null;
+ private static Configuration APPLICATION_PROPERTIES = null;
@@ -68,45 +58,45 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
* Construct a NotificationEntityChangeListener.
*
* @param notificationInterface the notification framework interface
- * @param typeSystem the Atlas type system
+ * @param typeRegistry the Atlas type system
*/
@Inject
- public NotificationEntityChangeListener(NotificationInterface notificationInterface, TypeSystem typeSystem) {
+ public NotificationEntityChangeListener(NotificationInterface notificationInterface, AtlasTypeRegistry typeRegistry) {
this.notificationInterface = notificationInterface;
- this.typeSystem = typeSystem;
+ this.typeRegistry = typeRegistry;
}
// ----- EntityChangeListener ----------------------------------------------
@Override
- public void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities, boolean isImport) throws AtlasException {
- notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_CREATE);
+ public void onEntitiesAdded(Collection<Referenceable> entities, boolean isImport) throws AtlasException {
+ notifyOfEntityEvent(entities, OperationType.ENTITY_CREATE);
}
@Override
- public void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities, boolean isImport) throws AtlasException {
- notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_UPDATE);
+ public void onEntitiesUpdated(Collection<Referenceable> entities, boolean isImport) throws AtlasException {
+ notifyOfEntityEvent(entities, OperationType.ENTITY_UPDATE);
}
@Override
- public void onTraitsAdded(ITypedReferenceableInstance entity, Collection<? extends IStruct> traits) throws AtlasException {
- notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_ADD);
+ public void onTraitsAdded(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException {
+ notifyOfEntityEvent(Collections.singleton(entity), OperationType.TRAIT_ADD);
}
@Override
- public void onTraitsDeleted(ITypedReferenceableInstance entity, Collection<String> traitNames) throws AtlasException {
- notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_DELETE);
+ public void onTraitsDeleted(Referenceable entity, Collection<String> traitNames) throws AtlasException {
+ notifyOfEntityEvent(Collections.singleton(entity), OperationType.TRAIT_DELETE);
}
@Override
- public void onTraitsUpdated(ITypedReferenceableInstance entity, Collection<? extends IStruct> traits) throws AtlasException {
- notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_UPDATE);
+ public void onTraitsUpdated(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException {
+ notifyOfEntityEvent(Collections.singleton(entity), OperationType.TRAIT_UPDATE);
}
@Override
- public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities, boolean isImport) throws AtlasException {
- notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_DELETE);
+ public void onEntitiesDeleted(Collection<Referenceable> entities, boolean isImport) throws AtlasException {
+ notifyOfEntityEvent(entities, OperationType.ENTITY_DELETE);
}
@@ -115,57 +105,52 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
// ----- helper methods ----------------------------------------------------
@VisibleForTesting
- public static List<IStruct> getAllTraits(IReferenceableInstance entityDefinition,
- TypeSystem typeSystem) throws AtlasException {
- List<IStruct> traitInfo = new LinkedList<>();
- for (String traitName : entityDefinition.getTraits()) {
- IStruct trait = entityDefinition.getTrait(traitName);
- String typeName = trait.getTypeName();
- Map<String, Object> valuesMap = trait.getValuesMap();
- traitInfo.add(new Struct(typeName, valuesMap));
- traitInfo.addAll(getSuperTraits(typeName, valuesMap, typeSystem));
- }
- return traitInfo;
- }
+ public static List<Struct> getAllTraits(Referenceable entityDefinition, AtlasTypeRegistry typeRegistry) throws AtlasException {
+ List<Struct> ret = new ArrayList<>();
- private static List<IStruct> getSuperTraits(
- String typeName, Map<String, Object> values, TypeSystem typeSystem) throws AtlasException {
+ for (String traitName : entityDefinition.getTraitNames()) {
+ Struct trait = entityDefinition.getTrait(traitName);
+ AtlasClassificationType traitType = typeRegistry.getClassificationTypeByName(traitName);
+ Set<String> superTypeNames = traitType != null ? traitType.getAllSuperTypes() : null;
- List<IStruct> superTypes = new LinkedList<>();
+ ret.add(trait);
- TraitType traitDef = typeSystem.getDataType(TraitType.class, typeName);
- Set<String> superTypeNames = traitDef.getAllSuperTypeNames();
+ if (CollectionUtils.isNotEmpty(superTypeNames)) {
+ for (String superTypeName : superTypeNames) {
+ Struct superTypeTrait = new Struct(superTypeName);
- for (String superTypeName : superTypeNames) {
- TraitType superTraitDef = typeSystem.getDataType(TraitType.class, superTypeName);
+ if (MapUtils.isNotEmpty(trait.getValues())) {
+ AtlasClassificationType superType = typeRegistry.getClassificationTypeByName(superTypeName);
- Map<String, Object> superTypeValues = new HashMap<>();
+ if (superType != null && MapUtils.isNotEmpty(superType.getAllAttributes())) {
+ Map<String, Object> superTypeTraitAttributes = new HashMap<>();
- FieldMapping fieldMapping = superTraitDef.fieldMapping();
+ for (Map.Entry<String, Object> attrEntry : trait.getValues().entrySet()) {
+ String attrName = attrEntry.getKey();
- if (fieldMapping != null) {
- Set<String> superTypeAttributeNames = fieldMapping.fields.keySet();
+ if (superType.getAllAttributes().containsKey(attrName)) {
+ superTypeTraitAttributes.put(attrName, attrEntry.getValue());
+ }
+ }
- for (String superTypeAttributeName : superTypeAttributeNames) {
- if (values.containsKey(superTypeAttributeName)) {
- superTypeValues.put(superTypeAttributeName, values.get(superTypeAttributeName));
+ superTypeTrait.setValues(superTypeTraitAttributes);
+ }
}
+
+ ret.add(superTypeTrait);
}
}
- IStruct superTrait = new Struct(superTypeName, superTypeValues);
- superTypes.add(superTrait);
- superTypes.addAll(getSuperTraits(superTypeName, values, typeSystem));
}
- return superTypes;
+ return ret;
}
// send notification of entity change
- private void notifyOfEntityEvent(Collection<ITypedReferenceableInstance> entityDefinitions,
- EntityNotification.OperationType operationType) throws AtlasException {
- List<EntityNotification> messages = new LinkedList<>();
+ private void notifyOfEntityEvent(Collection<Referenceable> entityDefinitions,
+ OperationType operationType) throws AtlasException {
+ List<EntityNotificationV1> messages = new ArrayList<>();
- for (IReferenceableInstance entityDefinition : entityDefinitions) {
+ for (Referenceable entityDefinition : entityDefinitions) {
if(GraphHelper.isInternalType(entityDefinition.getTypeName())) {
continue;
}
@@ -182,13 +167,13 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
}
}
- EntityNotificationImpl notification = new EntityNotificationImpl(entity, operationType, getAllTraits(entity, typeSystem));
+ EntityNotificationV1 notification = new EntityNotificationV1(entity, operationType, getAllTraits(entity, typeRegistry));
messages.add(notification);
}
if (!messages.isEmpty()) {
- notificationInterface.send(NotificationInterface.NotificationType.ENTITIES, messages);
+ notificationInterface.send(NotificationType.ENTITIES, messages);
}
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 4646bff..456a778 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -25,17 +25,18 @@ import org.apache.atlas.AtlasBaseClient;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
-import org.apache.atlas.RequestContext;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.listener.ActiveStateChangeHandler;
-import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest;
-import org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest;
-import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
-import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
-import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.notification.NotificationInterface.NotificationType;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityPartialUpdateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
@@ -43,7 +44,6 @@ import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
import org.apache.atlas.service.Service;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.service.ServiceState;
@@ -57,10 +57,7 @@ import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -77,37 +74,37 @@ import static org.apache.atlas.AtlasClientV2.API_V2.UPDATE_ENTITY_BY_ATTRIBUTE;
@Order(4)
@DependsOn(value = {"atlasTypeDefStoreInitializer", "atlasTypeDefGraphStoreV1"})
public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
- private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
- private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class);
- private static final String LOCALHOST = "localhost";
- private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
+ private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class);
+ private static final Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
+ private static final String LOCALHOST = "localhost";
private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
- public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
- public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
+ public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
+ public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize";
- public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval";
- public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval";
- public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval";
-
+ public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval";
+ public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval";
+ public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval";
public static final int SERVER_READY_WAIT_TIME_MS = 1000;
- private final AtlasEntityStore atlasEntityStore;
- private final ServiceState serviceState;
+
+ private final AtlasEntityStore atlasEntityStore;
+ private final ServiceState serviceState;
private final AtlasInstanceConverter instanceConverter;
- private final AtlasTypeRegistry typeRegistry;
- private final int maxRetries;
- private final int failedMsgCacheSize;
+ private final AtlasTypeRegistry typeRegistry;
+ private final int maxRetries;
+ private final int failedMsgCacheSize;
+ private final int minWaitDuration;
+ private final int maxWaitDuration;
+
+ private NotificationInterface notificationInterface;
+ private ExecutorService executors;
+ private Configuration applicationProperties;
@VisibleForTesting
final int consumerRetryInterval;
- private final int minWaitDuration;
- private final int maxWaitDuration;
-
- private NotificationInterface notificationInterface;
- private ExecutorService executors;
- private Configuration applicationProperties;
@VisibleForTesting
List<HookConsumer> consumers;
@@ -117,18 +114,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
ServiceState serviceState, AtlasInstanceConverter instanceConverter,
AtlasTypeRegistry typeRegistry) throws AtlasException {
this.notificationInterface = notificationInterface;
- this.atlasEntityStore = atlasEntityStore;
- this.serviceState = serviceState;
- this.instanceConverter = instanceConverter;
- this.typeRegistry = typeRegistry;
-
+ this.atlasEntityStore = atlasEntityStore;
+ this.serviceState = serviceState;
+ this.instanceConverter = instanceConverter;
+ this.typeRegistry = typeRegistry;
this.applicationProperties = ApplicationProperties.get();
- maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
- failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20);
+ maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
+ failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20);
consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
- minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms by default
- maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default
+ minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms by default
+ maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default
}
@Override
@@ -145,21 +141,24 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
if (!HAConfiguration.isHAEnabled(configuration)) {
LOG.info("HA is disabled, starting consumers inline.");
+
startConsumers(executorService);
}
}
private void startConsumers(ExecutorService executorService) {
- int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
- List<NotificationConsumer<HookNotificationMessage>> notificationConsumers =
- notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
+ int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
+ List<NotificationConsumer<HookNotification>> notificationConsumers = notificationInterface.createConsumers(NotificationType.HOOK, numThreads);
+
if (executorService == null) {
- executorService = Executors.newFixedThreadPool(notificationConsumers.size(),
- new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
+ executorService = Executors.newFixedThreadPool(notificationConsumers.size(), new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
}
+
executors = executorService;
- for (final NotificationConsumer<HookNotificationMessage> consumer : notificationConsumers) {
+
+ for (final NotificationConsumer<HookNotification> consumer : notificationConsumers) {
HookConsumer hookConsumer = new HookConsumer(consumer);
+
consumers.add(hookConsumer);
executors.submit(hookConsumer);
}
@@ -172,11 +171,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
stopConsumerThreads();
if (executors != null) {
executors.shutdown();
+
if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
+
executors = null;
}
+
notificationInterface.close();
} catch (InterruptedException e) {
LOG.error("Failure in shutting down consumers");
@@ -190,6 +192,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
for (HookConsumer consumer : consumers) {
consumer.shutdown();
}
+
consumers.clear();
}
@@ -205,6 +208,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@Override
public void instanceIsActive() {
LOG.info("Reacting to active state: initializing Kafka consumers");
+
startConsumers(executors);
}
@@ -217,6 +221,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@Override
public void instanceIsPassive() {
LOG.info("Reacting to passive state: shutting down Kafka consumers.");
+
stop();
}
@@ -236,18 +241,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final long maxDuration;
private final long minDuration;
private final long resetInterval;
+ private long lastWaitAt;
- private long lastWaitAt;
@VisibleForTesting
long waitDuration;
public AdaptiveWaiter(long minDuration, long maxDuration, long increment) {
- this.minDuration = minDuration;
- this.maxDuration = maxDuration;
- this.increment = increment;
-
- this.waitDuration = minDuration;
- this.lastWaitAt = 0;
+ this.minDuration = minDuration;
+ this.maxDuration = maxDuration;
+ this.increment = increment;
+ this.waitDuration = minDuration;
+ this.lastWaitAt = 0;
this.resetInterval = maxDuration * 2;
}
@@ -269,7 +273,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private void setWaitDurations() {
long timeSinceLastWait = (lastWaitAt == 0) ? 0 : System.currentTimeMillis() - lastWaitAt;
+
lastWaitAt = System.currentTimeMillis();
+
if (timeSinceLastWait > resetInterval) {
waitDuration = minDuration;
} else {
@@ -283,14 +289,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@VisibleForTesting
class HookConsumer extends ShutdownableThread {
- private final NotificationConsumer<HookNotificationMessage> consumer;
- private final AtomicBoolean shouldRun = new AtomicBoolean(false);
- private List<HookNotificationMessage> failedMessages = new ArrayList<>();
-
- private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
+ private final NotificationConsumer<HookNotification> consumer;
+ private final AtomicBoolean shouldRun = new AtomicBoolean(false);
+ private final List<HookNotification> failedMessages = new ArrayList<>();
+ private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
- public HookConsumer(NotificationConsumer<HookNotificationMessage> consumer) {
+ public HookConsumer(NotificationConsumer<HookNotification> consumer) {
super("atlas-hook-consumer-thread", false);
+
this.consumer = consumer;
}
@@ -307,8 +313,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
try {
while (shouldRun.get()) {
try {
- List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive();
- for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
+ List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive();
+
+ for (AtlasKafkaMessage<HookNotification> msg : messages) {
handleMessage(msg);
}
} catch (IllegalStateException ex) {
@@ -316,6 +323,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
} catch (Exception e) {
if (shouldRun.get()) {
LOG.warn("Exception in NotificationHookConsumer", e);
+
adaptiveWaiter.pause(e);
} else {
break;
@@ -325,6 +333,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
} finally {
if (consumer != null) {
LOG.info("closing NotificationConsumer");
+
consumer.close();
}
@@ -333,11 +342,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
@VisibleForTesting
- void handleMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) throws AtlasServiceException, AtlasException {
- AtlasPerfTracer perf = null;
-
- HookNotificationMessage message = kafkaMsg.getMessage();
- String messageUser = message.getUser();
+ void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) throws AtlasServiceException, AtlasException {
+ AtlasPerfTracer perf = null;
+ HookNotification message = kafkaMsg.getMessage();
+ String messageUser = message.getUser();
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name());
@@ -345,21 +353,25 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
try {
// Used for intermediate conversions during create and update
- AtlasEntity.AtlasEntitiesWithExtInfo entities;
+ AtlasEntitiesWithExtInfo entities = null;
+
for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
if (LOG.isDebugEnabled()) {
LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries);
}
+
try {
- RequestContext requestContext = RequestContext.createContext();
+ RequestContextV1 requestContext = RequestContextV1.get();
+
requestContext.setUser(messageUser);
switch (message.getType()) {
case ENTITY_CREATE:
- EntityCreateRequest createRequest = (EntityCreateRequest) message;
+ final EntityCreateRequest createRequest = (EntityCreateRequest) message;
if (numRetries == 0) { // audit only on the first attempt
AtlasBaseClient.API api = AtlasClient.API_V1.CREATE_ENTITY;
+
audit(messageUser, api.getMethod(), api.getNormalizedPath());
}
@@ -373,19 +385,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
if (numRetries == 0) { // audit only on the first attempt
AtlasBaseClient.API api = UPDATE_ENTITY_BY_ATTRIBUTE;
- audit(messageUser, api.getMethod(),
- String.format(api.getNormalizedPath(), partialUpdateRequest.getTypeName()));
+
+ audit(messageUser, api.getMethod(), String.format(api.getNormalizedPath(), partialUpdateRequest.getTypeName()));
}
Referenceable referenceable = partialUpdateRequest.getEntity();
+
entities = instanceConverter.toAtlasEntity(referenceable);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
- String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>() {
- {
- put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue());
- }
- });
+ String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, Collections.singletonMap(partialUpdateRequest.getAttribute(), (Object)partialUpdateRequest.getAttributeValue()));
// There should only be one root entity
entities.getEntities().get(0).setGuid(guid);
@@ -398,30 +407,30 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
if (numRetries == 0) { // audit only on the first attempt
AtlasBaseClient.API api = DELETE_ENTITY_BY_ATTRIBUTE;
- audit(messageUser, api.getMethod(),
- String.format(api.getNormalizedPath(), deleteRequest.getTypeName()));
+
+ audit(messageUser, api.getMethod(), String.format(api.getNormalizedPath(), deleteRequest.getTypeName()));
}
try {
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
- atlasEntityStore.deleteByUniqueAttributes(type,
- new HashMap<String, Object>() {{
- put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue());
- }});
+
+ atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue()));
} catch (ClassCastException cle) {
LOG.error("Failed to do a partial update on Entity");
}
break;
case ENTITY_FULL_UPDATE:
- EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
+ final EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
if (numRetries == 0) { // audit only on the first attempt
AtlasBaseClient.API api = UPDATE_ENTITY;
+
audit(messageUser, api.getMethod(), api.getNormalizedPath());
}
entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
+
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
break;
@@ -434,6 +443,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
LOG.warn("Error handling message", e);
try {
LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
+
Thread.sleep(consumerRetryInterval);
} catch (InterruptedException ie) {
LOG.error("Notification consumer thread sleep interrupted");
@@ -441,14 +451,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
if (numRetries == (maxRetries - 1)) {
LOG.warn("Max retries exceeded for message {}", message, e);
+
failedMessages.add(message);
+
if (failedMessages.size() >= failedMsgCacheSize) {
recordFailedMessages();
}
return;
}
} finally {
- RequestContext.clear();
RequestContextV1.clear();
}
}
@@ -460,15 +471,18 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private void recordFailedMessages() {
//logging failed messages
- for (HookNotificationMessage message : failedMessages) {
+ for (HookNotification message : failedMessages) {
FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", AbstractNotification.getMessageJson(message));
}
+
failedMessages.clear();
}
- private void commit(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage) {
+ private void commit(AtlasKafkaMessage<HookNotification> kafkaMessage) {
recordFailedMessages();
+
TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition());
+
consumer.commit(partition, kafkaMessage.getOffset() + 1);
}
@@ -476,22 +490,23 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
try {
while (serviceState.getState() != ServiceState.ServiceStateValue.ACTIVE) {
try {
- LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...",
- SERVER_READY_WAIT_TIME_MS);
+ LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...", SERVER_READY_WAIT_TIME_MS);
+
timer.sleep(SERVER_READY_WAIT_TIME_MS);
} catch (InterruptedException e) {
- LOG.info("Interrupted while waiting for Atlas Server to become ready, "
- + "exiting consumer thread.", e);
+ LOG.info("Interrupted while waiting for Atlas Server to become ready, " + "exiting consumer thread.", e);
+
return false;
}
}
} catch (Throwable e) {
- LOG.info(
- "Handled AtlasServiceException while waiting for Atlas Server to become ready, "
- + "exiting consumer thread.", e);
+ LOG.info("Handled AtlasServiceException while waiting for Atlas Server to become ready, exiting consumer thread.", e);
+
return false;
}
+
LOG.info("Atlas Server is ready, can start reading Kafka events.");
+
return true;
}
@@ -506,12 +521,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
super.initiateShutdown();
+
shouldRun.set(false);
+
if (consumer != null) {
consumer.wakeup();
}
super.awaitShutdown();
+
LOG.info("<== HookConsumer shutdown()");
}
}
@@ -521,7 +539,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
LOG.debug("==> audit({},{}, {})", messageUser, method, path);
}
- AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST,
- DateTimeHelper.formatDateUTC(new Date()));
+ AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST, DateTimeHelper.formatDateUTC(new Date()));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/errors/NotFoundExceptionMapper.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/errors/NotFoundExceptionMapper.java b/webapp/src/main/java/org/apache/atlas/web/errors/NotFoundExceptionMapper.java
index a33d8d7..bc0b440 100644
--- a/webapp/src/main/java/org/apache/atlas/web/errors/NotFoundExceptionMapper.java
+++ b/webapp/src/main/java/org/apache/atlas/web/errors/NotFoundExceptionMapper.java
@@ -17,7 +17,7 @@
*/
package org.apache.atlas.web.errors;
-import org.apache.atlas.typesystem.exception.NotFoundException;
+import org.apache.atlas.exception.NotFoundException;
import org.springframework.stereotype.Component;
import javax.ws.rs.core.Response;
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
index e8020db..1d553e0 100644
--- a/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
@@ -19,7 +19,7 @@
package org.apache.atlas.web.filters;
import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.RequestContext;
+import org.apache.atlas.RequestContextV1;
import org.apache.atlas.security.SecurityProperties;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.atlas.web.security.AtlasAuthenticationProvider;
@@ -311,7 +311,7 @@ public class AtlasAuthenticationFilter extends AuthenticationFilter {
try {
String requestUser = httpRequest.getRemoteUser();
NDC.push(requestUser + ":" + httpRequest.getMethod() + httpRequest.getRequestURI());
- RequestContext requestContext = RequestContext.get();
+ RequestContextV1 requestContext = RequestContextV1.get();
if (requestContext != null) {
requestContext.setUser(requestUser);
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
index 191388a..3225b0e 100755
--- a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
@@ -20,7 +20,6 @@ package org.apache.atlas.web.filters;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
-import org.apache.atlas.RequestContext;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.metrics.Metrics;
import org.apache.atlas.util.AtlasRepositoryConfiguration;
@@ -70,7 +69,8 @@ public class AuditFilter implements Filter {
try {
currentThread.setName(formatName(oldName, requestId));
- RequestContext requestContext = RequestContext.createContext();
+ RequestContextV1.clear();
+ RequestContextV1 requestContext = RequestContextV1.get();
requestContext.setUser(user);
recordAudit(httpRequest, requestTimeISO9601, user);
filterChain.doFilter(request, response);
@@ -79,7 +79,6 @@ public class AuditFilter implements Filter {
((HttpServletResponse) response).setHeader(AtlasClient.REQUEST_ID, requestId);
currentThread.setName(oldName);
recordMetrics();
- RequestContext.clear();
RequestContextV1.clear();
}
}
@@ -120,7 +119,7 @@ public class AuditFilter implements Filter {
public static void recordMetrics() {
//record metrics
- Metrics requestMetrics = RequestContext.getMetrics();
+ Metrics requestMetrics = RequestContextV1.getMetrics();
if (!requestMetrics.isEmpty()) {
METRICS_LOG.info("{}", requestMetrics);
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/resources/DataSetLineageResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/DataSetLineageResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/DataSetLineageResource.java
index 435659e..5660c5b 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/DataSetLineageResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/DataSetLineageResource.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,13 +18,21 @@
package org.apache.atlas.web.resources;
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.discovery.DiscoveryException;
-import org.apache.atlas.discovery.LineageService;
-import org.apache.atlas.typesystem.exception.EntityNotFoundException;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.discovery.AtlasLineageService;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.lineage.AtlasLineageInfo;
+import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfTracer;
+import org.apache.atlas.v1.model.lineage.DataSetLineageResponse;
+import org.apache.atlas.v1.model.lineage.SchemaResponse;
+import org.apache.atlas.web.util.LineageUtils;
import org.apache.atlas.web.util.Servlets;
-import org.codehaus.jettison.json.JSONObject;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@@ -32,9 +40,18 @@ import org.springframework.stereotype.Service;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.atlas.v1.model.lineage.SchemaResponse.SchemaDetails;
/**
* Jersey Resource for Hive Table Lineage.
@@ -45,20 +62,18 @@ import javax.ws.rs.core.Response;
@Deprecated
public class DataSetLineageResource {
- private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageResource.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageResource.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.DataSetLineageResource");
- private final LineageService lineageService;
+ private final AtlasLineageService atlasLineageService;
+ private final AtlasTypeRegistry typeRegistry;
+ private final AtlasEntityStore atlasEntityStore;
- /**
- * Created by the Guice ServletModule and injected with the
- * configured LineageService.
- *
- * @param lineageService lineage service handle
- */
@Inject
- public DataSetLineageResource(LineageService lineageService) {
- this.lineageService = lineageService;
+ public DataSetLineageResource(final AtlasLineageService atlasLineageService, final AtlasTypeRegistry typeRegistry, final AtlasEntityStore atlasEntityStore) {
+ this.atlasLineageService = atlasLineageService;
+ this.typeRegistry = typeRegistry;
+ this.atlasEntityStore = atlasEntityStore;
}
/**
@@ -70,30 +85,28 @@ public class DataSetLineageResource {
@Path("table/{tableName}/inputs/graph")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
- public Response inputsGraph(@Context HttpServletRequest request, @PathParam("tableName") String tableName) {
+ public DataSetLineageResponse inputsGraph(@Context HttpServletRequest request, @PathParam("tableName") String tableName) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> DataSetLineageResource.inputsGraph({})", tableName);
}
- AtlasPerfTracer perf = null;
+ DataSetLineageResponse ret = new DataSetLineageResponse();
+ AtlasPerfTracer perf = null;
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DataSetLineageResource.inputsGraph(tableName=" + tableName + ")");
}
- final String jsonResult = lineageService.getInputsGraph(tableName);
+ String guid = getGuid(tableName);
- JSONObject response = new JSONObject();
- response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
- response.put("tableName", tableName);
- response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
+ AtlasLineageInfo lineageInfo = atlasLineageService.getAtlasLineageInfo(guid, LineageDirection.INPUT, -1);
+ ret.setTableName(tableName);
+ ret.setRequestId(Servlets.getRequestId());
+ ret.setResults(LineageUtils.toLineageStruct(lineageInfo, typeRegistry));
- return Response.ok(response).build();
- } catch (EntityNotFoundException e) {
- LOG.error("table entity not found for {}", tableName);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
- } catch (DiscoveryException | IllegalArgumentException e) {
+ return ret;
+ } catch (IllegalArgumentException e) {
LOG.error("Unable to get lineage inputs graph for table {}", tableName, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (WebApplicationException e) {
@@ -116,30 +129,28 @@ public class DataSetLineageResource {
@Path("table/{tableName}/outputs/graph")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
- public Response outputsGraph(@Context HttpServletRequest request, @PathParam("tableName") String tableName) {
+ public DataSetLineageResponse outputsGraph(@Context HttpServletRequest request, @PathParam("tableName") String tableName) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> DataSetLineageResource.outputsGraph({})", tableName);
}
- AtlasPerfTracer perf = null;
+ DataSetLineageResponse ret = new DataSetLineageResponse();
+ AtlasPerfTracer perf = null;
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DataSetLineageResource.outputsGraph(tableName=" + tableName + ")");
}
- final String jsonResult = lineageService.getOutputsGraph(tableName);
+ String guid = getGuid(tableName);
- JSONObject response = new JSONObject();
- response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
- response.put("tableName", tableName);
- response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
+ AtlasLineageInfo lineageInfo = atlasLineageService.getAtlasLineageInfo(guid, LineageDirection.OUTPUT, -1);
+ ret.setTableName(tableName);
+ ret.setRequestId(Servlets.getRequestId());
+ ret.setResults(LineageUtils.toLineageStruct(lineageInfo, typeRegistry));
- return Response.ok(response).build();
- } catch (EntityNotFoundException e) {
- LOG.error("table entity not found for {}", tableName);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
- } catch (DiscoveryException | IllegalArgumentException e) {
+ return ret;
+ } catch (IllegalArgumentException e) {
LOG.error("Unable to get lineage outputs graph for table {}", tableName, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (WebApplicationException e) {
@@ -162,30 +173,26 @@ public class DataSetLineageResource {
@Path("table/{tableName}/schema")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
- public Response schema(@Context HttpServletRequest request, @PathParam("tableName") String tableName) {
+ public SchemaResponse schema(@Context HttpServletRequest request, @PathParam("tableName") String tableName) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> DataSetLineageResource.schema({})", tableName);
}
AtlasPerfTracer perf = null;
+ SchemaResponse ret = new SchemaResponse();
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DataSetLineageResource.schema(tableName=" + tableName + ")");
}
- final String jsonResult = lineageService.getSchema(tableName);
+ SchemaDetails schemaDetails = atlasLineageService.getSchemaForHiveTableByName(tableName);
- JSONObject response = new JSONObject();
- response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
- response.put("tableName", tableName);
- response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
-
- return Response.ok(response).build();
- } catch (EntityNotFoundException e) {
- LOG.error("table entity not found for {}", tableName);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
- } catch (DiscoveryException | IllegalArgumentException e) {
+ ret.setRequestId(Servlets.getRequestId());
+ ret.setTableName(tableName);
+ ret.setResults(schemaDetails);
+ return ret;
+ } catch (IllegalArgumentException e) {
LOG.error("Unable to get schema for table {}", tableName, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (WebApplicationException e) {
@@ -198,4 +205,20 @@ public class DataSetLineageResource {
AtlasPerfTracer.log(perf);
}
}
+
+ private String getGuid(String tableName) throws AtlasBaseException {
+ if (StringUtils.isEmpty(tableName)) {
+ // TODO: Fix the error code if mismatch
+ throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST);
+ }
+ Map<String, Object> lookupAttributes = new HashMap<>();
+ lookupAttributes.put("qualifiedName", tableName);
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName("hive_table");
+ AtlasEntity.AtlasEntityWithExtInfo hive_table = atlasEntityStore.getByUniqueAttributes(entityType, lookupAttributes);
+ if (hive_table != null) {
+ return hive_table.getEntity().getGuid();
+ } else {
+ throw new AtlasBaseException(AtlasErrorCode.INSTANCE_NOT_FOUND, tableName);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
index 8b56507..11879e6 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
@@ -20,39 +20,34 @@ package org.apache.atlas.web.resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.sun.jersey.api.core.ResourceContext;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.CreateUpdateEntitiesResult;
import org.apache.atlas.EntityAuditEvent;
+import org.apache.atlas.discovery.AtlasDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasClassification;
-import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.GuidMapping;
import org.apache.atlas.model.legacy.EntityResult;
+import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
-import org.apache.atlas.services.MetadataService;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.typesystem.IStruct;
-import org.apache.atlas.typesystem.ITypedReferenceableInstance;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.exception.EntityExistsException;
-import org.apache.atlas.typesystem.exception.EntityNotFoundException;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
-import org.apache.atlas.typesystem.persistence.Id;
-import org.apache.atlas.typesystem.types.ValueConversionException;
+import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.utils.ParamChecker;
+import org.apache.atlas.v1.model.instance.Id;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.web.rest.EntityREST;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.collections.CollectionUtils;
@@ -98,32 +93,29 @@ public class EntityResource {
private static final String TRAIT_NAME = "traitName";
- private final MetadataService metadataService;
private final AtlasInstanceConverter restAdapters;
private final AtlasEntityStore entitiesStore;
private final AtlasTypeRegistry typeRegistry;
- private final EntityREST entityREST;
+ private final EntityREST entityREST;
+ private final EntityAuditRepository entityAuditRepository;
+ private final AtlasDiscoveryService atlasDiscoveryService;
@Context
UriInfo uriInfo;
- @Context
- private ResourceContext resourceContext;
-
- /**
- * Created by the Guice ServletModule and injected with the
- * configured MetadataService.
- *
- * @param metadataService metadata service handle
- */
@Inject
- public EntityResource(MetadataService metadataService, AtlasInstanceConverter restAdapters,
- AtlasEntityStore entitiesStore, AtlasTypeRegistry typeRegistry, EntityREST entityREST) {
- this.metadataService = metadataService;
- this.restAdapters = restAdapters;
- this.entitiesStore = entitiesStore;
- this.typeRegistry = typeRegistry;
+ public EntityResource(final AtlasInstanceConverter restAdapters,
+ final AtlasEntityStore entitiesStore,
+ final AtlasTypeRegistry typeRegistry,
+ final EntityREST entityREST,
+ final EntityAuditRepository entityAuditRepository,
+ final AtlasDiscoveryService atlasDiscoveryService) {
+ this.restAdapters = restAdapters;
+ this.entitiesStore = entitiesStore;
+ this.typeRegistry = typeRegistry;
this.entityREST = entityREST;
+ this.entityAuditRepository = entityAuditRepository;
+ this.atlasDiscoveryService = atlasDiscoveryService;
}
/**
@@ -149,22 +141,28 @@ public class EntityResource {
String entities = Servlets.getRequestPayload(request);
//Handle backward compatibility - if entities is not JSONArray, convert to JSONArray
+ JSONArray jsonEntities = null;
+
try {
- new JSONArray(entities);
+ jsonEntities = new JSONArray(entities);
} catch (JSONException e) {
final String finalEntities = entities;
- entities = new JSONArray() {{
+ jsonEntities = new JSONArray() {{
put(finalEntities);
- }}.toString();
+ }};
}
- entityJson = AtlasClient.toString(new JSONArray(entities));
+ String[] jsonStrings = new String[jsonEntities.length()];
+
+ for (int i = 0; i < jsonEntities.length(); i++) {
+ jsonStrings[i] = jsonEntities.getString(i);
+ }
if (LOG.isDebugEnabled()) {
- LOG.debug("submitting entities {} ", entityJson);
+ LOG.debug("submitting entities {} ", jsonEntities);
}
- AtlasEntitiesWithExtInfo entitiesInfo = restAdapters.toAtlasEntities(entities);
+ AtlasEntitiesWithExtInfo entitiesInfo = restAdapters.toAtlasEntities(jsonStrings);
EntityMutationResponse mutationResponse = entityREST.createOrUpdate(entitiesInfo);
final List<String> guids = restAdapters.getGuids(mutationResponse.getCreatedEntities());
@@ -183,12 +181,6 @@ public class EntityResource {
} catch (AtlasBaseException e) {
LOG.error("Unable to persist entity instance entityDef={}", entityJson, e);
throw toWebApplicationException(e);
- } catch(EntityExistsException e) {
- LOG.error("Unique constraint violation for entity entityDef={}", entityJson, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.CONFLICT));
- } catch (ValueConversionException ve) {
- LOG.error("Unable to persist entity instance due to a deserialization error entityDef={}", entityJson, ve);
- throw new WebApplicationException(Servlets.getErrorResponse(ve.getCause() != null ? ve.getCause() : ve, Response.Status.BAD_REQUEST));
} catch (AtlasException | IllegalArgumentException e) {
LOG.error("Unable to persist entity instance entityDef={}", entityJson, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
@@ -224,13 +216,13 @@ public class EntityResource {
return locationURI;
}
- private JSONObject getResponse(EntityResult entityResult) throws AtlasException, JSONException {
+ private JSONObject getResponse(EntityResult entityResult) throws AtlasBaseException, AtlasException, JSONException {
CreateUpdateEntitiesResult result = new CreateUpdateEntitiesResult();
result.setEntityResult(entityResult);
return getResponse(result);
}
- private JSONObject getResponse(CreateUpdateEntitiesResult result) throws AtlasException, JSONException {
+ private JSONObject getResponse(CreateUpdateEntitiesResult result) throws AtlasBaseException, AtlasException, JSONException {
JSONObject response = new JSONObject();
EntityResult entityResult = result.getEntityResult();
GuidMapping mapping = result.getGuidMapping();
@@ -239,12 +231,12 @@ public class EntityResource {
response.put(AtlasClient.ENTITIES, new JSONObject(entityResult.toString()).get(AtlasClient.ENTITIES));
String sampleEntityId = getSample(result.getEntityResult());
if (sampleEntityId != null) {
- String entityDefinition = metadataService.getEntityDefinitionJson(sampleEntityId);
+ String entityDefinition = getEntityJson(sampleEntityId);
response.put(AtlasClient.DEFINITION, new JSONObject(entityDefinition));
}
}
if(mapping != null) {
- response.put(AtlasClient.GUID_ASSIGNMENTS, new JSONObject(AtlasType.toJson(mapping)).get(AtlasClient.GUID_ASSIGNMENTS));
+ response.put(AtlasClient.GUID_ASSIGNMENTS, new JSONObject(AtlasType.toV1Json(mapping)).get(AtlasClient.GUID_ASSIGNMENTS));
}
return response;
}
@@ -270,14 +262,18 @@ public class EntityResource {
}
final String entities = Servlets.getRequestPayload(request);
+ JSONArray jsonEntities = new JSONArray(entities);
+ String[] jsonStrings = new String[jsonEntities.length()];
- entityJson = AtlasClient.toString(new JSONArray(entities));
+ for (int i = 0; i < jsonEntities.length(); i++) {
+ jsonStrings[i] = jsonEntities.getString(i);
+ }
if (LOG.isDebugEnabled()) {
LOG.info("updating entities {} ", entityJson);
}
- AtlasEntitiesWithExtInfo entitiesInfo = restAdapters.toAtlasEntities(entities);
+ AtlasEntitiesWithExtInfo entitiesInfo = restAdapters.toAtlasEntities(jsonStrings);
EntityMutationResponse mutationResponse = entityREST.createOrUpdate(entitiesInfo);
CreateUpdateEntitiesResult result = restAdapters.toCreateUpdateEntitiesResult(mutationResponse);
@@ -290,12 +286,6 @@ public class EntityResource {
} catch (AtlasBaseException e) {
LOG.error("Unable to persist entity instance entityDef={}", entityJson, e);
throw toWebApplicationException(e);
- } catch(EntityExistsException e) {
- LOG.error("Unique constraint violation for entityDef={}", entityJson, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.CONFLICT));
- } catch (ValueConversionException ve) {
- LOG.error("Unable to persist entity instance due to a deserialization error entityDef={}", entityJson, ve);
- throw new WebApplicationException(Servlets.getErrorResponse(ve.getCause(), Response.Status.BAD_REQUEST));
} catch (AtlasException | IllegalArgumentException e) {
LOG.error("Unable to persist entity instance entityDef={}", entityJson, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
@@ -367,7 +357,7 @@ public class EntityResource {
LOG.debug("Partially updating entity by unique attribute {} {} {} {} ", entityType, attribute, value, entityJson);
}
- Referenceable updatedEntity = InstanceSerialization.fromJsonReferenceable(entityJson, true);
+ Referenceable updatedEntity = AtlasType.fromV1Json(entityJson, Referenceable.class);
entityType = ParamChecker.notEmpty(entityType, "Entity type cannot be null");
attribute = ParamChecker.notEmpty(attribute, "attribute name cannot be null");
@@ -379,10 +369,10 @@ public class EntityResource {
// update referenceable with Id if not specified in payload
Id updateId = updatedEntity.getId();
- if (updateId != null && !updateId.isAssigned()) {
+ if (updateId != null && !AtlasTypeUtil.isAssignedGuid(updateId.getId())) {
String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(getEntityType(entityType), attributes);
- updatedEntity.replaceWithNewId(new Id(guid, 0, updatedEntity.getTypeName()));
+ updatedEntity.setId(new Id(guid, 0, updatedEntity.getTypeName()));
}
AtlasEntitiesWithExtInfo entitiesInfo = restAdapters.toAtlasEntity(updatedEntity);
@@ -398,15 +388,6 @@ public class EntityResource {
} catch (AtlasBaseException e) {
LOG.error("Unable to partially update entity {} {}:{}.{}", entityJson, entityType, attribute, value, e);
throw toWebApplicationException(e);
- } catch (ValueConversionException ve) {
- LOG.error("Unable to persist entity instance due to a deserialization error {} ", entityJson, ve);
- throw new WebApplicationException(Servlets.getErrorResponse(ve.getCause(), Response.Status.BAD_REQUEST));
- } catch(EntityExistsException e) {
- LOG.error("Unique constraint violation for entity {} ", entityJson, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.CONFLICT));
- } catch (EntityNotFoundException e) {
- LOG.error("An entity with type={} and qualifiedName={} does not exist {} ", entityType, value, entityJson, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
} catch (AtlasException | IllegalArgumentException e) {
LOG.error("Unable to partially update entity {} {}:{}.{}", entityJson, entityType, attribute, value, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
@@ -474,13 +455,13 @@ public class EntityResource {
LOG.debug("partially updating entity for guid {} : {} ", guid, entityJson);
}
- Referenceable updatedEntity = InstanceSerialization.fromJsonReferenceable(entityJson, true);
+ Referenceable updatedEntity = AtlasType.fromV1Json(entityJson, Referenceable.class);
// update referenceable with Id if not specified in payload
Id updateId = updatedEntity.getId();
- if (updateId != null && !updateId.isAssigned()) {
- updatedEntity.replaceWithNewId(new Id(guid, 0, updatedEntity.getTypeName()));
+ if (updateId != null && !AtlasTypeUtil.isAssignedGuid(updateId.getId())) {
+ updatedEntity.setId(new Id(guid, 0, updatedEntity.getTypeName()));
}
AtlasEntitiesWithExtInfo entitiesInfo = restAdapters.toAtlasEntity(updatedEntity);
@@ -496,9 +477,6 @@ public class EntityResource {
} catch (AtlasBaseException e) {
LOG.error("Unable to update entity by GUID {} {} ", guid, entityJson, e);
throw toWebApplicationException(e);
- } catch (EntityNotFoundException e) {
- LOG.error("An entity with GUID={} does not exist {} ", guid, entityJson, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
} catch (AtlasException | IllegalArgumentException e) {
LOG.error("Unable to update entity by GUID {} {}", guid, entityJson, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
@@ -544,9 +522,6 @@ public class EntityResource {
} catch (AtlasBaseException e) {
LOG.error("Unable to add property {} to entity id {} {} ", property, guid, value, e);
throw toWebApplicationException(e);
- } catch (EntityNotFoundException e) {
- LOG.error("An entity with GUID={} does not exist {} ", guid, value, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
} catch (AtlasException | IllegalArgumentException e) {
LOG.error("Unable to add property {} to entity id {} {} ", property, guid, value, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
@@ -617,13 +592,6 @@ public class EntityResource {
} catch (AtlasBaseException e) {
LOG.error("Unable to delete entities {} {} {} {} ", guids, entityType, attribute, value, e);
throw toWebApplicationException(e);
- } catch (EntityNotFoundException e) {
- if(guids != null && !guids.isEmpty()) {
- LOG.error("An entity with GUID={} does not exist ", guids, e);
- } else {
- LOG.error("An entity with qualifiedName {}-{}-{} does not exist", entityType, attribute, value, e);
- }
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
} catch (AtlasException | IllegalArgumentException e) {
LOG.error("Unable to delete entities {} {} {} {} ", guids, entityType, attribute, value, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
@@ -666,7 +634,8 @@ public class EntityResource {
}
guid = ParamChecker.notEmpty(guid, "guid cannot be null");
- final String entityDefinition = metadataService.getEntityDefinitionJson(guid);
+
+ String entityDefinition = getEntityJson(guid);
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
@@ -681,11 +650,7 @@ public class EntityResource {
}
return Response.status(status).entity(response).build();
-
- } catch (EntityNotFoundException e) {
- LOG.error("An entity with GUID={} does not exist ", guid, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
- } catch (AtlasException | IllegalArgumentException e) {
+ } catch (IllegalArgumentException e) {
LOG.error("Bad GUID={} ", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (WebApplicationException e) {
@@ -716,19 +681,19 @@ public class EntityResource {
LOG.debug("Fetching entity list for type={} ", entityType);
}
- final List<String> entityList = metadataService.getEntityList(entityType);
+ List<String> entityGUIDS = entitiesStore.getEntityGUIDS(entityType);
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
response.put(AtlasClient.TYPENAME, entityType);
- response.put(AtlasClient.RESULTS, new JSONArray(entityList));
- response.put(AtlasClient.COUNT, entityList.size());
+ response.put(AtlasClient.RESULTS, new JSONArray(entityGUIDS));
+ response.put(AtlasClient.COUNT, entityGUIDS.size());
return Response.ok(response).build();
} catch (NullPointerException e) {
LOG.error("Entity type cannot be null", e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
- } catch (AtlasException | IllegalArgumentException e) {
+ } catch (IllegalArgumentException e) {
LOG.error("Unable to get entity list for type {}", entityType, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (WebApplicationException e) {
@@ -804,10 +769,9 @@ public class EntityResource {
String entityDefinition = null;
if (entityInfo != null) {
- AtlasEntity entity = entityInfo.getEntity();
- final ITypedReferenceableInstance instance = restAdapters.getITypedReferenceable(entity);
+ Referenceable instance = restAdapters.getReferenceable(entityInfo);
- entityDefinition = InstanceSerialization.toJson(instance, true);
+ entityDefinition = AtlasType.toV1Json(instance);
}
JSONObject response = new JSONObject();
@@ -926,8 +890,8 @@ public class EntityResource {
JSONArray traits = new JSONArray();
for (AtlasClassification classification : classifications) {
- IStruct trait = restAdapters.getTrait(classification);
- traits.put(new JSONObject(InstanceSerialization.toJson(trait, true)));
+ Struct trait = restAdapters.getTrait(classification);
+ traits.put(new JSONObject(AtlasType.toV1Json(trait)));
}
JSONObject response = new JSONObject();
@@ -984,11 +948,11 @@ public class EntityResource {
final AtlasClassification classification = entitiesStore.getClassification(guid, traitName);
- IStruct traitDefinition = restAdapters.getTrait(classification);
+ Struct traitDefinition = restAdapters.getTrait(classification);
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
- response.put(AtlasClient.RESULTS, new JSONObject(InstanceSerialization.toJson(traitDefinition, true)));
+ response.put(AtlasClient.RESULTS, new JSONObject(AtlasType.toV1Json(traitDefinition)));
return Response.ok(response).build();
@@ -1044,7 +1008,7 @@ public class EntityResource {
add(guid);
}};
- entitiesStore.addClassification(guids, restAdapters.getClassification(InstanceSerialization.fromJsonStruct(traitDefinition, true)));
+ entitiesStore.addClassification(guids, restAdapters.toAtlasClassification(AtlasType.fromV1Json(traitDefinition, Struct.class)));
URI locationURI = getLocationURI(new ArrayList<String>() {{
add(guid);
@@ -1160,13 +1124,13 @@ public class EntityResource {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityResource.getAuditEvents(" + guid + ", " + startKey + ", " + count + ")");
}
- List<EntityAuditEvent> events = metadataService.getAuditEvents(guid, startKey, count);
+ List<EntityAuditEvent> events = entityAuditRepository.listEvents(guid, startKey, count);
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
response.put(AtlasClient.EVENTS, getJSONArray(events));
return Response.ok(response).build();
- } catch (AtlasException | IllegalArgumentException e) {
+ } catch (IllegalArgumentException e) {
LOG.error("Unable to get audit events for entity guid={} startKey={}", guid, startKey, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (WebApplicationException e) {
@@ -1216,4 +1180,12 @@ public class EntityResource {
return new WebApplicationException(Servlets.getErrorResponse(e, e.getAtlasErrorCode().getHttpCode()));
}
+
+ private String getEntityJson(String guid) throws AtlasBaseException {
+ AtlasEntityWithExtInfo entity = entitiesStore.getById(guid);
+ Referenceable referenceable = restAdapters.getReferenceable(entity);
+ String entityJson = AtlasType.toV1Json(referenceable);
+
+ return entityJson;
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java
index cba8ccf..891e4d7 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java
@@ -18,21 +18,16 @@
package org.apache.atlas.web.resources;
-import org.apache.atlas.AtlasClient;
import org.apache.atlas.discovery.AtlasLineageService;
-import org.apache.atlas.discovery.DiscoveryException;
-import org.apache.atlas.discovery.LineageService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.typesystem.exception.EntityNotFoundException;
-import org.apache.atlas.typesystem.exception.SchemaNotFoundException;
import org.apache.atlas.utils.AtlasPerfTracer;
+import org.apache.atlas.v1.model.lineage.LineageResponse;
+import org.apache.atlas.v1.model.lineage.SchemaResponse;
import org.apache.atlas.web.util.LineageUtils;
import org.apache.atlas.web.util.Servlets;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@@ -56,18 +51,15 @@ public class LineageResource {
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.LineageResource");
private final AtlasLineageService atlasLineageService;
- private final LineageService lineageService;
private final AtlasTypeRegistry typeRegistry;
/**
* Created by the Guice ServletModule and injected with the
* configured LineageService.
*
- * @param lineageService lineage service handle
*/
@Inject
- public LineageResource(LineageService lineageService, AtlasLineageService atlasLineageService, AtlasTypeRegistry typeRegistry) {
- this.lineageService = lineageService;
+ public LineageResource(AtlasLineageService atlasLineageService, AtlasTypeRegistry typeRegistry) {
this.atlasLineageService = atlasLineageService;
this.typeRegistry = typeRegistry;
}
@@ -81,11 +73,13 @@ public class LineageResource {
@Path("{guid}/inputs/graph")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
- public Response inputsGraph(@PathParam("guid") String guid) {
+ public LineageResponse inputsGraph(@PathParam("guid") String guid) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> LineageResource.inputsGraph({})", guid);
}
+ LineageResponse ret = new LineageResponse();
+
AtlasPerfTracer perf = null;
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
@@ -93,22 +87,16 @@ public class LineageResource {
}
AtlasLineageInfo lineageInfo = atlasLineageService.getAtlasLineageInfo(guid, LineageDirection.INPUT, -1);
- final String result = LineageUtils.toLineageStruct(lineageInfo, typeRegistry);
-
- JSONObject response = new JSONObject();
- response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
- response.put(AtlasClient.RESULTS, new JSONObject(result));
+ ret.setRequestId(Servlets.getRequestId());
+ ret.setResults(LineageUtils.toLineageStruct(lineageInfo, typeRegistry));
- return Response.ok(response).build();
+ return ret;
} catch (AtlasBaseException e) {
LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e));
} catch (WebApplicationException e) {
LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e);
throw e;
- } catch (JSONException e) {
- LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
} finally {
AtlasPerfTracer.log(perf);
@@ -127,11 +115,13 @@ public class LineageResource {
@Path("{guid}/outputs/graph")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
- public Response outputsGraph(@PathParam("guid") String guid) {
+ public LineageResponse outputsGraph(@PathParam("guid") String guid) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> LineageResource.outputsGraph({})", guid);
}
+ LineageResponse ret = new LineageResponse();
+
AtlasPerfTracer perf = null;
try {
@@ -140,22 +130,16 @@ public class LineageResource {
}
AtlasLineageInfo lineageInfo = atlasLineageService.getAtlasLineageInfo(guid, LineageDirection.OUTPUT, -1);
- final String result = LineageUtils.toLineageStruct(lineageInfo, typeRegistry);
+ ret.setRequestId(Servlets.getRequestId());
+ ret.setResults(LineageUtils.toLineageStruct(lineageInfo, typeRegistry));
- JSONObject response = new JSONObject();
- response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
- response.put(AtlasClient.RESULTS, new JSONObject(result));
-
- return Response.ok(response).build();
+ return ret;
} catch (AtlasBaseException e) {
LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e));
} catch (WebApplicationException e) {
LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e);
throw e;
- } catch (JSONException e) {
- LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
} finally {
AtlasPerfTracer.log(perf);
@@ -174,31 +158,26 @@ public class LineageResource {
@Path("{guid}/schema")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
- public Response schema(@PathParam("guid") String guid) {
+ public SchemaResponse schema(@PathParam("guid") String guid) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> LineageResource.schema({})", guid);
}
AtlasPerfTracer perf = null;
+ SchemaResponse ret = new SchemaResponse();
+
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "LineageResource.schema(" + guid + ")");
}
- final String jsonResult = lineageService.getSchemaForEntity(guid);
+ SchemaResponse.SchemaDetails schemaDetails = atlasLineageService.getSchemaForHiveTableByGuid(guid);
- JSONObject response = new JSONObject();
- response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
- response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
- return Response.ok(response).build();
- } catch (SchemaNotFoundException e) {
- LOG.error("schema not found for {}", guid);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
- } catch (EntityNotFoundException e) {
- LOG.error("table entity not found for {}", guid);
- throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
- } catch (DiscoveryException | IllegalArgumentException e) {
+ ret.setRequestId(Servlets.getRequestId());
+ ret.setResults(schemaDetails);
+ return ret;
+ } catch (IllegalArgumentException e) {
LOG.error("Unable to get schema for entity guid={}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (WebApplicationException e) {