You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/07/26 07:37:19 UTC
atlas git commit: ATLAS-1988: added REST API to search for related
entities
Repository: atlas
Updated Branches:
refs/heads/0.8-incubating bc2d39d10 -> de60a6560
ATLAS-1988: added REST API to search for related entities
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/de60a656
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/de60a656
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/de60a656
Branch: refs/heads/0.8-incubating
Commit: de60a6560113e311cc46a773bedd4071c7d413fe
Parents: bc2d39d
Author: Sarath Subramanian <ss...@hortonworks.com>
Authored: Tue Jul 18 23:09:19 2017 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Wed Jul 26 00:36:31 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/atlas/AtlasErrorCode.java | 2 +
.../main/java/org/apache/atlas/SortOrder.java | 22 +++
.../model/discovery/AtlasSearchResult.java | 2 +-
.../atlas/discovery/AtlasDiscoveryService.java | 13 ++
.../atlas/discovery/EntityDiscoveryService.java | 134 +++++++++++++++++++
.../store/graph/v1/EntityGraphRetriever.java | 2 +-
.../atlas/util/AtlasGremlin2QueryProvider.java | 6 +
.../atlas/util/AtlasGremlinQueryProvider.java | 5 +-
.../apache/atlas/web/rest/DiscoveryREST.java | 39 ++++++
.../NotificationHookConsumerKafkaTest.java | 26 +++-
10 files changed, 246 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index d723b2a..f267a33 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -71,6 +71,8 @@ public enum AtlasErrorCode {
BAD_REQUEST(400, "ATLAS-400-00-029", "{0}"),
PARAMETER_PARSING_FAILED(400, "ATLAS-400-00-02A", "Parameter parsing failed at: {0}"),
MISSING_MANDATORY_ATTRIBUTE(400, "ATLAS-400-00-02B", "Mandatory field {0}.{1} has empty/null value"),
+ INVALID_RELATIONSHIP_ATTRIBUTE(400, "ATLAS-400-00-02C", "Expected attribute {0} to be a relationship but found type {}"),
+ INVALID_RELATIONSHIP_TYPE(400, "ATLAS-400-00-02D", "Invalid entity type '{0}', guid '{1}' in relationship search"),
// All Not found enums go here
TYPE_NAME_NOT_FOUND(404, "ATLAS-404-00-001", "Given typename {0} was invalid"),
http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/intg/src/main/java/org/apache/atlas/SortOrder.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/SortOrder.java b/intg/src/main/java/org/apache/atlas/SortOrder.java
new file mode 100644
index 0000000..e3eef4e
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/SortOrder.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas;
+
+public enum SortOrder {
+ ASCENDING, DESCENDING
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
index 5827440..0c32e01 100644
--- a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
+++ b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
@@ -185,7 +185,7 @@ public class AtlasSearchResult implements Serializable {
'}';
}
- public enum AtlasQueryType { DSL, FULL_TEXT, GREMLIN, BASIC, ATTRIBUTE }
+ public enum AtlasQueryType { DSL, FULL_TEXT, GREMLIN, BASIC, ATTRIBUTE, RELATIONSHIP }
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
index 764b548..ead5d3c 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
@@ -19,6 +19,7 @@
package org.apache.atlas.discovery;
+import org.apache.atlas.SortOrder;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.SearchParameters;
@@ -65,4 +66,16 @@ public interface AtlasDiscoveryService {
* @throws AtlasBaseException
*/
AtlasSearchResult searchWithParameters(SearchParameters searchParameters) throws AtlasBaseException;
+
+ /**
+ *
+ * @param guid unique ID of the entity.
+ * @param relation relation name.
+ * @param sortByAttribute sort the result using this attribute name, default value is 'name'
+ * @param sortOrder sorting order
+ * @param limit number of resultant rows (for pagination). [ limit > 0 ] and [ limit < maxlimit ]. -1 maps to atlas.search.defaultlimit property.
+ * @param offset offset to the results returned (for pagination). [ offset >= 0 ]. -1 maps to offset 0.
+ * @return AtlasSearchResult
+ */
+ AtlasSearchResult searchRelatedEntities(String guid, String relation, String sortByAttribute, SortOrder sortOrder, int limit, int offset) throws AtlasBaseException;
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
index b183c72..2a24782 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
@@ -21,6 +21,7 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
+import org.apache.atlas.SortOrder;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
import org.apache.atlas.exception.AtlasBaseException;
@@ -67,6 +68,7 @@ import scala.util.Either;
import scala.util.parsing.combinator.Parsers.NoSuccess;
import javax.inject.Inject;
+import javax.script.Bindings;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.*;
@@ -74,10 +76,19 @@ import java.util.*;
import static org.apache.atlas.AtlasErrorCode.CLASSIFICATION_NOT_FOUND;
import static org.apache.atlas.AtlasErrorCode.DISCOVERY_QUERY_FAILED;
import static org.apache.atlas.AtlasErrorCode.UNKNOWN_TYPENAME;
+import static org.apache.atlas.SortOrder.DESCENDING;
+import static org.apache.atlas.model.TypeCategory.ARRAY;
+import static org.apache.atlas.model.TypeCategory.MAP;
+import static org.apache.atlas.model.TypeCategory.OBJECT_ID_TYPE;
+import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX;
+import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.RELATIONSHIP_SEARCH;
+import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.RELATIONSHIP_SEARCH_DESCENDING_SORT;
+import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.RELATIONSHIP_SEARCH_ASCENDING_SORT;
@Component
public class EntityDiscoveryService implements AtlasDiscoveryService {
private static final Logger LOG = LoggerFactory.getLogger(EntityDiscoveryService.class);
+ private static final String DEFAULT_SORT_ATTRIBUTE_NAME = "name";
private final AtlasGraph graph;
private final DefaultGraphPersistenceStrategy graphPersistenceStrategy;
@@ -485,6 +496,98 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
return ret;
}
+ @Override
+ @GraphTransaction
+ public AtlasSearchResult searchRelatedEntities(String guid, String relation, String sortByAttributeName,
+ SortOrder sortOrder, int limit, int offset) throws AtlasBaseException {
+ AtlasSearchResult ret = new AtlasSearchResult(AtlasQueryType.RELATIONSHIP);
+
+ if (StringUtils.isEmpty(guid) || StringUtils.isEmpty(relation)) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "guid: '" + guid + "', relation: '" + relation + "'");
+ }
+
+ AtlasVertex entityVertex = entityRetriever.getEntityVertex(guid);
+ String entityTypeName = GraphHelper.getTypeName(entityVertex);
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
+
+ if (entityType == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_TYPE, entityTypeName, guid);
+ }
+
+ AtlasAttribute attribute = entityType.getAttribute(relation);
+
+ if (attribute != null) {
+ if (isRelationshipAttribute(attribute)) {
+ relation = EDGE_LABEL_PREFIX + attribute.getQualifiedName();
+ } else {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_ATTRIBUTE, relation, attribute.getTypeName());
+ }
+ }
+
+ if (StringUtils.isEmpty(sortByAttributeName)) {
+ sortByAttributeName = DEFAULT_SORT_ATTRIBUTE_NAME;
+ }
+
+ AtlasAttribute sortByAttribute = entityType.getAttribute(sortByAttributeName);
+
+ if (sortByAttribute == null) {
+ sortByAttributeName = null;
+ sortOrder = null;
+ } else {
+ sortByAttributeName = sortByAttribute.getQualifiedName();
+
+ if (sortOrder == null) {
+ sortOrder = SortOrder.ASCENDING;
+ }
+ }
+
+ String relatedEntitiesQuery = getRelatedEntitiesQuery(sortOrder);
+ ScriptEngine scriptEngine = graph.getGremlinScriptEngine();
+ Bindings bindings = scriptEngine.createBindings();
+ QueryParams params = validateSearchParams(limit, offset);
+
+ bindings.put("g", graph);
+ bindings.put("guid", guid);
+ bindings.put("relation", relation);
+ bindings.put("sortAttributeName", sortByAttributeName);
+ bindings.put("offset", params.offset());
+ bindings.put("limit", params.offset() + params.limit());
+
+ try {
+ Object result = graph.executeGremlinScript(scriptEngine, bindings, relatedEntitiesQuery, false);
+
+ if (result instanceof List && CollectionUtils.isNotEmpty((List) result)) {
+ List<?> queryResult = (List) result;
+ Object firstElement = queryResult.get(0);
+
+ if (firstElement instanceof AtlasVertex) {
+ List<AtlasVertex> vertices = (List<AtlasVertex>) queryResult;
+ List<AtlasEntityHeader> resultList = new ArrayList<>(vertices.size());
+
+ for (AtlasVertex vertex : vertices) {
+ resultList.add(entityRetriever.toAtlasEntityHeader(vertex));
+ }
+
+ ret.setEntities(resultList);
+ }
+ }
+
+ if (ret.getEntities() == null) {
+ ret.setEntities(new ArrayList<AtlasEntityHeader>());
+ }
+ } catch (ScriptException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Gremlin script execution failed for relationship search query: " + e);
+ }
+
+ throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "Relationship search query failed");
+ } finally {
+ graph.releaseGremlinScriptEngine(scriptEngine);
+ }
+
+ return ret;
+ }
+
public int getMaxResultSetSize() {
return maxResultSetSize;
}
@@ -640,4 +743,35 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
return "";
}
+
+ private boolean isRelationshipAttribute(AtlasAttribute attribute) throws AtlasBaseException {
+ boolean ret = true;
+ AtlasType attrType = attribute.getAttributeType();
+
+ if (attrType.getTypeCategory() == ARRAY) {
+ attrType = ((AtlasArrayType) attrType).getElementType();
+ } else if (attrType.getTypeCategory() == MAP) {
+ attrType = ((AtlasMapType) attrType).getValueType();
+ }
+
+ if (attrType.getTypeCategory() != OBJECT_ID_TYPE) {
+ ret = false;
+ }
+
+ return ret;
+ }
+
+ private String getRelatedEntitiesQuery(SortOrder sortOrder) {
+ final String ret;
+
+ if (sortOrder == null) {
+ ret = gremlinQueryProvider.getQuery(RELATIONSHIP_SEARCH);
+ } else if (sortOrder == DESCENDING) {
+ ret = gremlinQueryProvider.getQuery(RELATIONSHIP_SEARCH_DESCENDING_SORT);
+ } else {
+ ret = gremlinQueryProvider.getQuery(RELATIONSHIP_SEARCH_ASCENDING_SORT);
+ }
+
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
index b8e0227..f6c8c72 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
@@ -131,7 +131,7 @@ public final class EntityGraphRetriever {
return toAtlasEntityHeader(entityVertex, Collections.<String>emptySet());
}
- private AtlasVertex getEntityVertex(String guid) throws AtlasBaseException {
+ public AtlasVertex getEntityVertex(String guid) throws AtlasBaseException {
AtlasVertex ret = AtlasGraphUtilsV1.findByGuid(guid);
if (ret == null) {
http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
index 1bf0346..a61bb66 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
@@ -75,6 +75,12 @@ public class AtlasGremlin2QueryProvider extends AtlasGremlinQueryProvider {
return " [startIdx..<endIdx].toList()";
case GUID_PREFIX_FILTER:
return ".filter{it.'__guid'.matches(guid)}";
+ case RELATIONSHIP_SEARCH:
+ return "g.V('__guid', guid).both(relation)[offset..<limit].toList()";
+ case RELATIONSHIP_SEARCH_DESCENDING_SORT:
+ return "g.V('__guid', guid).both(relation)[offset..<limit].order{it.b.getProperty(sortAttributeName) <=> it.a.getProperty(sortAttributeName)}.toList()";
+ case RELATIONSHIP_SEARCH_ASCENDING_SORT:
+ return "g.V('__guid', guid).both(relation)[offset..<limit].order{it.a.getProperty(sortAttributeName) <=> it.b.getProperty(sortAttributeName)}.toList()";
}
// Should never reach this point
return null;
http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
index 8481a4f..def4733 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
@@ -60,6 +60,9 @@ public abstract class AtlasGremlinQueryProvider {
BASIC_SEARCH_CLASSIFICATION_FILTER,
BASIC_SEARCH_STATE_FILTER,
TO_RANGE_LIST,
- GUID_PREFIX_FILTER
+ GUID_PREFIX_FILTER,
+ RELATIONSHIP_SEARCH,
+ RELATIONSHIP_SEARCH_ASCENDING_SORT,
+ RELATIONSHIP_SEARCH_DESCENDING_SORT
}
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java
index efab72a..86f618c 100644
--- a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java
@@ -18,6 +18,7 @@
package org.apache.atlas.web.rest;
import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.SortOrder;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.discovery.AtlasDiscoveryService;
import org.apache.atlas.model.discovery.AtlasSearchResult;
@@ -255,6 +256,44 @@ public class DiscoveryREST {
}
}
+ /**
+ * Relationship search to search for related entities satisfying the search parameters
+ * @param guid Attribute name
+ * @param relation relationName
+ * @param sortByAttribute sort the result using this attribute name, default value is 'name'
+ * @param sortOrder sorting order
+ * @param limit limit the result set to only include the specified number of entries
+ * @param offset start offset of the result set (useful for pagination)
+ * @return Atlas search result
+ * @throws AtlasBaseException
+ *
+ * @HTTP 200 On successful search
+ * @HTTP 400 guid is not a valid entity type or attributeName is not a valid relationship attribute
+ */
+ @GET
+ @Path("relationship")
+ @Consumes(Servlets.JSON_MEDIA_TYPE)
+ @Produces(Servlets.JSON_MEDIA_TYPE)
+ public AtlasSearchResult searchRelatedEntities(@QueryParam("guid") String guid,
+ @QueryParam("relation") String relation,
+ @QueryParam("sortBy") String sortByAttribute,
+ @QueryParam("sortOrder") SortOrder sortOrder,
+ @QueryParam("limit") int limit,
+ @QueryParam("offset") int offset) throws AtlasBaseException {
+ AtlasPerfTracer perf = null;
+
+ try {
+ if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+ perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.relatedEntitiesSearchUsingGremlin(" + guid +
+ ", " + relation + ", " + sortByAttribute + ", " + sortOrder + ", " + limit + ", " + offset + ")");
+ }
+
+ return atlasDiscoveryService.searchRelatedEntities(guid, relation, sortByAttribute, sortOrder, limit, offset);
+ } finally {
+ AtlasPerfTracer.log(perf);
+ }
+ }
+
private boolean isEmpty(SearchParameters.FilterCriteria filterCriteria) {
return filterCriteria == null ||
(StringUtils.isEmpty(filterCriteria.getAttributeName()) && CollectionUtils.isEmpty(filterCriteria.getCriterion()));
http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index eb37fa8..1a3c413 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -100,8 +100,30 @@ public class NotificationHookConsumerKafkaTest {
produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, false);
- NotificationHookConsumer notificationHookConsumer =
- new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
+
+ consumeOneMessage(consumer, hookConsumer);
+ verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
+
+ // produce another message, and make sure it moves ahead. If commit succeeded, this would work.
+ produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity()));
+ consumeOneMessage(consumer, hookConsumer);
+ verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
+ reset(atlasEntityStore);
+ }
+ finally {
+ kafkaNotification.close();
+ }
+ }
+
+ @Test
+ public void testConsumerConsumesNewMessageWithAutoCommitDisabled1() throws AtlasException, InterruptedException, AtlasBaseException {
+ try {
+ produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
+
+ NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, false);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer);