You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by pi...@apache.org on 2021/05/25 16:42:10 UTC
[atlas] branch master updated: ATLAS-4254 : Basic Search : Optimize
pagination
This is an automated email from the ASF dual-hosted git repository.
pinal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 35419d6 ATLAS-4254 : Basic Search : Optimize pagination
35419d6 is described below
commit 35419d6c5a8b1b2bbba60e2f6d6fdf19efd8949b
Author: Pinal <pinal-shah>
AuthorDate: Mon May 10 21:55:10 2021 +0530
ATLAS-4254 : Basic Search : Optimize pagination
Signed-off-by: Pinal <pinal-shah>
---
.../atlas/model/discovery/AtlasSearchResult.java | 11 +-
.../atlas/model/discovery/SearchParameters.java | 20 +-
.../discovery/ClassificationSearchProcessor.java | 81 ++++--
.../atlas/discovery/EntityDiscoveryService.java | 5 +
.../atlas/discovery/EntitySearchProcessor.java | 79 +++---
.../atlas/discovery/FreeTextSearchProcessor.java | 28 +-
.../atlas/discovery/FullTextSearchProcessor.java | 25 +-
.../org/apache/atlas/discovery/SearchContext.java | 91 ++++++-
.../apache/atlas/discovery/SearchProcessor.java | 83 ++++--
.../atlas/discovery/TermSearchProcessor.java | 52 ++--
.../atlas/discovery/AtlasDiscoveryServiceTest.java | 301 ++++++++++++++-------
.../ClassificationSearchProcessorTest.java | 57 ++++
.../org/apache/atlas/web/rest/DiscoveryREST.java | 4 +-
13 files changed, 603 insertions(+), 234 deletions(-)
diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
index e1c550e..ce0f84b 100644
--- a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
+++ b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
@@ -53,6 +53,7 @@ public class AtlasSearchResult implements Serializable {
private List<AtlasFullTextResult> fullTextResult;
private Map<String, AtlasEntityHeader> referredEntities;
private long approximateCount = -1;
+ private String nextMarker;
public AtlasSearchResult() {}
@@ -131,8 +132,12 @@ public class AtlasSearchResult implements Serializable {
public void setApproximateCount(long approximateCount) { this.approximateCount = approximateCount; }
+ public String getNextMarker() { return nextMarker; }
+
+ public void setNextMarker(String nextMarker) { this.nextMarker = nextMarker; }
+
@Override
- public int hashCode() { return Objects.hash(queryType, searchParameters, queryText, type, classification, entities, attributes, fullTextResult, referredEntities); }
+ public int hashCode() { return Objects.hash(queryType, searchParameters, queryText, type, classification, entities, attributes, fullTextResult, referredEntities, nextMarker); }
@Override
public boolean equals(Object o) {
@@ -147,7 +152,8 @@ public class AtlasSearchResult implements Serializable {
Objects.equals(entities, that.entities) &&
Objects.equals(attributes, that.attributes) &&
Objects.equals(fullTextResult, that.fullTextResult) &&
- Objects.equals(referredEntities, that.referredEntities);
+ Objects.equals(referredEntities, that.referredEntities) &&
+ Objects.equals(nextMarker, that.nextMarker);
}
public void addEntity(AtlasEntityHeader newEntity) {
@@ -190,6 +196,7 @@ public class AtlasSearchResult implements Serializable {
", fullTextResult=" + fullTextResult +
", referredEntities=" + referredEntities +
", approximateCount=" + approximateCount +
+ ", nextMarker=" + nextMarker +
'}';
}
diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java b/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java
index 9d2cd4f..78fb4a4 100644
--- a/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java
+++ b/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java
@@ -52,6 +52,7 @@ public class SearchParameters implements Serializable {
private boolean includeSubClassifications = true;
private int limit;
private int offset;
+ private String marker;
private FilterCriteria entityFilters;
private FilterCriteria tagFilters;
@@ -216,6 +217,16 @@ public class SearchParameters implements Serializable {
}
/**
+ * @return marker (offset of the next page)
+ */
+ public String getMarker() { return marker; }
+
+ /**
+ * @param marker
+ */
+ public void setMarker(String marker) { this.marker = marker; }
+
+ /**
* Entity attribute filters for the type (if type name is specified)
* @return
*/
@@ -294,6 +305,8 @@ public class SearchParameters implements Serializable {
SearchParameters that = (SearchParameters) o;
return excludeDeletedEntities == that.excludeDeletedEntities &&
includeClassificationAttributes == that.includeClassificationAttributes &&
+ includeSubTypes == that.includeSubTypes &&
+ includeSubClassifications == that.includeSubClassifications &&
limit == that.limit &&
offset == that.offset &&
Objects.equals(query, that.query) &&
@@ -309,8 +322,9 @@ public class SearchParameters implements Serializable {
@Override
public int hashCode() {
- return Objects.hash(query, typeName, classification, termName, excludeDeletedEntities, includeClassificationAttributes,
- limit, offset, entityFilters, tagFilters, attributes, sortBy, sortOrder);
+ return Objects.hash(query, typeName, classification, termName, includeSubTypes, includeSubClassifications,
+ excludeDeletedEntities, includeClassificationAttributes, limit, offset, entityFilters,
+ tagFilters, attributes, sortBy, sortOrder);
}
public StringBuilder toString(StringBuilder sb) {
@@ -323,6 +337,8 @@ public class SearchParameters implements Serializable {
sb.append(", typeName='").append(typeName).append('\'');
sb.append(", classification='").append(classification).append('\'');
sb.append(", termName='").append(termName).append('\'');
+ sb.append(", includeSubTypes='").append(includeSubTypes).append('\'');
+ sb.append(", includeSubClassifications='").append(includeSubClassifications).append('\'');
sb.append(", excludeDeletedEntities=").append(excludeDeletedEntities);
sb.append(", includeClassificationAttributes=").append(includeClassificationAttributes);
sb.append(", limit=").append(limit);
diff --git a/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java
index 647ff9c..dfcc441 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java
@@ -27,6 +27,7 @@ import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.util.SearchPredicateUtil;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.collections.Predicate;
import org.apache.commons.collections.PredicateUtils;
import org.apache.commons.lang3.StringUtils;
@@ -212,26 +213,32 @@ public class ClassificationSearchProcessor extends SearchProcessor {
}
try {
- final int startIdx = context.getSearchParameters().getOffset();
final int limit = context.getSearchParameters().getLimit();
+ Integer marker = context.getMarker();
// query to start at 0, even though startIdx can be higher - because few results in earlier retrieval could
// have been dropped: like non-active-entities or duplicate-entities (same entity pointed to by multiple
// classifications in the result)
//
// first 'startIdx' number of entries will be ignored
- int qryOffset = 0;
+ //marker functionality will not work when there is need to fetch classificationVertices and get entities from it
+ if (indexQuery == null) {
+ marker = null;
+ }
+ // if marker is provided, start query with marker offset
+ int startIdx = marker != null ? marker : context.getSearchParameters().getOffset();
+ int qryOffset = marker != null ? marker : 0;
int resultIdx = qryOffset;
- final Set<String> processedGuids = new HashSet<>();
- final List<AtlasVertex> entityVertices = new ArrayList<>();
- final List<AtlasVertex> classificationVertices = new ArrayList<>();
+ final Set<String> processedGuids = new HashSet<>();
+ LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap = new LinkedHashMap<>();
+ final List<AtlasVertex> classificationVertices = new ArrayList<>();
final String sortBy = context.getSearchParameters().getSortBy();
final SortOrder sortOrder = context.getSearchParameters().getSortOrder();
for (; ret.size() < limit; qryOffset += limit) {
- entityVertices.clear();
+ offsetEntityVertexMap.clear();
classificationVertices.clear();
if (context.terminateSearch()) {
@@ -251,12 +258,12 @@ public class ClassificationSearchProcessor extends SearchProcessor {
queryResult = indexQuery.vertices(qryOffset, limit);
}
- getVerticesFromIndexQueryResult(queryResult, entityVertices);
- isLastResultPage = entityVertices.size() < limit;
+ offsetEntityVertexMap = getVerticesFromIndexQueryResult(queryResult, offsetEntityVertexMap, qryOffset);
+ isLastResultPage = offsetEntityVertexMap.size() < limit;
// Do in-memory filtering
- CollectionUtils.filter(entityVertices, traitPredicate);
- CollectionUtils.filter(entityVertices, isEntityPredicate);
+ offsetEntityVertexMap = super.filter(offsetEntityVertexMap, traitPredicate);
+ offsetEntityVertexMap = super.filter(offsetEntityVertexMap, isEntityPredicate);
} else {
if (classificationIndexQuery != null) {
@@ -283,11 +290,14 @@ public class ClassificationSearchProcessor extends SearchProcessor {
// Since tag filters are present, we need to collect the entity vertices after filtering the classification
// vertex results (as these might be lower in number)
if (CollectionUtils.isNotEmpty(classificationVertices)) {
+ int resultCount = 0;
+
for (AtlasVertex classificationVertex : classificationVertices) {
Iterable<AtlasEdge> edges = classificationVertex.getEdges(AtlasEdgeDirection.IN, Constants.CLASSIFICATION_LABEL);
for (AtlasEdge edge : edges) {
AtlasVertex entityVertex = edge.getOutVertex();
+ resultCount++;
String guid = AtlasGraphUtilsV2.getIdFromVertex(entityVertex);
@@ -295,7 +305,7 @@ public class ClassificationSearchProcessor extends SearchProcessor {
continue;
}
- entityVertices.add(entityVertex);
+ offsetEntityVertexMap.put((qryOffset + resultCount) - 1, entityVertex);
processedGuids.add(guid);
}
@@ -303,22 +313,28 @@ public class ClassificationSearchProcessor extends SearchProcessor {
}
if (whiteSpaceFilter) {
- filterWhiteSpaceClassification(entityVertices);
+ offsetEntityVertexMap = filterWhiteSpaceClassification(offsetEntityVertexMap);
}
- // Do in-memory filtering
- CollectionUtils.filter(entityVertices, isEntityPredicate);
+ // Do in-memory filtering
+ offsetEntityVertexMap = super.filter(offsetEntityVertexMap, isEntityPredicate);
if (activePredicate != null) {
- CollectionUtils.filter(entityVertices, activePredicate);
+ offsetEntityVertexMap = super.filter(offsetEntityVertexMap, activePredicate);
}
- super.filter(entityVertices);
+ offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
- resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices);
+ resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, offsetEntityVertexMap, marker);
if (isLastResultPage) {
+ resultIdx = SearchContext.MarkerUtil.MARKER_END - 1;
break;
}
}
+
+ if (marker != null) {
+ nextOffset = resultIdx + 1;
+ }
+
} finally {
AtlasPerfTracer.log(perf);
}
@@ -331,20 +347,23 @@ public class ClassificationSearchProcessor extends SearchProcessor {
}
@Override
- public void filter(List<AtlasVertex> entityVertices) {
+ public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap) {
if (LOG.isDebugEnabled()) {
- LOG.debug("==> ClassificationSearchProcessor.filter({})", entityVertices.size());
+ LOG.debug("==> ClassificationSearchProcessor.filter({})", offsetEntityVertexMap.size());
}
if (inMemoryPredicate != null) {
//in case of classification type + index attributes
- CollectionUtils.filter(entityVertices, traitPredicate);
+ offsetEntityVertexMap = super.filter(offsetEntityVertexMap, traitPredicate);
//filter attributes (filterCriteria). Find classification vertex(typeName = classification) from entity vertex (traitName = classification)
final Set<String> processedGuids = new HashSet<>();
- List<AtlasVertex> matchEntityVertices = new ArrayList<>();
- if (CollectionUtils.isNotEmpty(entityVertices)) {
- for (AtlasVertex entityVertex : entityVertices) {
+ LinkedHashMap<Integer, AtlasVertex> matchEntityVertices = new LinkedHashMap<>();
+
+ if (MapUtils.isNotEmpty(offsetEntityVertexMap)) {
+ for (Map.Entry<Integer, AtlasVertex> offsetToEntity : offsetEntityVertexMap.entrySet()) {
+
+ AtlasVertex entityVertex = offsetToEntity.getValue();
Iterable<AtlasEdge> edges = entityVertex.getEdges(AtlasEdgeDirection.OUT, Constants.CLASSIFICATION_LABEL);
for (AtlasEdge edge : edges) {
@@ -358,7 +377,7 @@ public class ClassificationSearchProcessor extends SearchProcessor {
continue;
}
- matchEntityVertices.add(entityVertex);
+ matchEntityVertices.put(offsetToEntity.getKey(), entityVertex);
processedGuids.add(guid);
break;
@@ -366,20 +385,22 @@ public class ClassificationSearchProcessor extends SearchProcessor {
}
}
}
- entityVertices.clear();
- entityVertices.addAll(matchEntityVertices);
+ offsetEntityVertexMap.clear();
+ offsetEntityVertexMap.putAll(matchEntityVertices);
} else {
//in case of only classsification type
- CollectionUtils.filter(entityVertices, traitPredicate);
- CollectionUtils.filter(entityVertices, isEntityPredicate);
+ offsetEntityVertexMap = super.filter(offsetEntityVertexMap, traitPredicate);
+ offsetEntityVertexMap = super.filter(offsetEntityVertexMap, isEntityPredicate);
}
- super.filter(entityVertices);
+ offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
if (LOG.isDebugEnabled()) {
- LOG.debug("<== ClassificationSearchProcessor.filter(): ret.size()={}", entityVertices.size());
+ LOG.debug("<== ClassificationSearchProcessor.filter(): ret.size()={}", offsetEntityVertexMap.size());
}
+
+ return offsetEntityVertexMap;
}
@Override
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
index a3ab6e3..f2290c6 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
@@ -463,6 +463,11 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
ret.setApproximateCount(searchContext.getSearchProcessor().getResultCount());
+ String nextMarker = searchContext.getSearchProcessor().getNextMarker();
+ if (StringUtils.isNotEmpty(nextMarker)) {
+ ret.setNextMarker(nextMarker);
+ }
+
// By default any attribute that shows up in the search parameter should be sent back in the response
// If additional values are requested then the entityAttributes will be a superset of the all search attributes
// and the explicitly requested attribute(s)
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java
index 5dcff3b..f45ccaf 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java
@@ -240,32 +240,26 @@ public class EntitySearchProcessor extends SearchProcessor {
}
try {
- final int startIdx = context.getSearchParameters().getOffset();
- final int limit = context.getSearchParameters().getLimit();
+ final int limit = context.getSearchParameters().getLimit();
+ final Integer marker = context.getMarker();
+ final int startIdx = marker != null ? marker : context.getSearchParameters().getOffset();
// when subsequent filtering stages are involved, query should start at 0 even though startIdx can be higher
//
// first 'startIdx' number of entries will be ignored
- int qryOffset = (nextProcessor != null || (graphQuery != null && indexQuery != null)) ? 0 : startIdx;
- int resultIdx = qryOffset;
-
- final List<AtlasVertex> entityVertices = new ArrayList<>();
-
- SortOrder sortOrder = context.getSearchParameters().getSortOrder();
- String sortBy = context.getSearchParameters().getSortBy();
-
- final AtlasEntityType entityType = context.getEntityTypes().iterator().next();
- AtlasAttribute sortByAttribute = entityType.getAttribute(sortBy);
- if (sortByAttribute == null) {
- sortBy = null;
+ // if marker is provided, start query with marker offset
+ int qryOffset;
+ if (marker != null) {
+ qryOffset = marker;
} else {
- sortBy = sortByAttribute.getVertexPropertyName();
+ qryOffset = (nextProcessor != null || (graphQuery != null && indexQuery != null)) ? 0 : startIdx;
}
+ int resultIdx = qryOffset;
- if (sortOrder == null) { sortOrder = ASCENDING; }
+ LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap = new LinkedHashMap<>();
for (; ret.size() < limit; qryOffset += limit) {
- entityVertices.clear();
+ offsetEntityVertexMap.clear();
if (context.terminateSearch()) {
LOG.warn("query terminated: {}", context.getSearchParameters());
@@ -277,41 +271,36 @@ public class EntitySearchProcessor extends SearchProcessor {
if (indexQuery != null) {
Iterator<AtlasIndexQuery.Result> idxQueryResult = executeIndexQuery(context, indexQuery, qryOffset, limit);
+ offsetEntityVertexMap = getVerticesFromIndexQueryResult(idxQueryResult, offsetEntityVertexMap, qryOffset);
- getVerticesFromIndexQueryResult(idxQueryResult, entityVertices);
-
- isLastResultPage = entityVertices.size() < limit;
-
- // Do in-memory filtering before the graph query
- CollectionUtils.filter(entityVertices, inMemoryPredicate);
-
- if (graphQueryPredicate != null) {
- CollectionUtils.filter(entityVertices, graphQueryPredicate);
- }
} else {
Iterator<AtlasVertex> queryResult = graphQuery.vertices(qryOffset, limit).iterator();
+ offsetEntityVertexMap = getVertices(queryResult, offsetEntityVertexMap, qryOffset);
+ }
- getVertices(queryResult, entityVertices);
-
- isLastResultPage = entityVertices.size() < limit;
-
- // Do in-memory filtering
- CollectionUtils.filter(entityVertices, inMemoryPredicate);
+ isLastResultPage = offsetEntityVertexMap.size() < limit;
+ // Do in-memory filtering
+ offsetEntityVertexMap = super.filter(offsetEntityVertexMap, inMemoryPredicate);
- //incase when operator is NEQ in pipeSeperatedSystemAttributes
- if (graphQueryPredicate != null) {
- CollectionUtils.filter(entityVertices, graphQueryPredicate);
- }
+ //incase when operator is NEQ in pipeSeperatedSystemAttributes
+ if (graphQueryPredicate != null) {
+ offsetEntityVertexMap = super.filter(offsetEntityVertexMap, graphQueryPredicate);
}
- super.filter(entityVertices);
+ offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
- resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices);
+ resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, offsetEntityVertexMap, marker);
if (isLastResultPage) {
+ resultIdx = MarkerUtil.MARKER_END - 1;
break;
}
}
+
+ if (marker != null) {
+ nextOffset = resultIdx + 1;
+ }
+
} finally {
AtlasPerfTracer.log(perf);
}
@@ -324,23 +313,25 @@ public class EntitySearchProcessor extends SearchProcessor {
}
@Override
- public void filter(List<AtlasVertex> entityVertices) {
+ public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer,AtlasVertex> offsetEntityVertexMap) {
if (LOG.isDebugEnabled()) {
- LOG.debug("==> EntitySearchProcessor.filter({})", entityVertices.size());
+ LOG.debug("==> EntitySearchProcessor.filter({})", offsetEntityVertexMap.size());
}
// Since we already have the entity vertices, a in-memory filter will be faster than fetching the same
// vertices again with the required filtering
if (filterGraphQueryPredicate != null) {
LOG.debug("Filtering in-memory");
- CollectionUtils.filter(entityVertices, filterGraphQueryPredicate);
+ offsetEntityVertexMap = super.filter(offsetEntityVertexMap, filterGraphQueryPredicate);
}
- super.filter(entityVertices);
+ offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
if (LOG.isDebugEnabled()) {
- LOG.debug("<== EntitySearchProcessor.filter(): ret.size()={}", entityVertices.size());
+ LOG.debug("<== EntitySearchProcessor.filter(): ret.size()={}", offsetEntityVertexMap.size());
}
+
+ return offsetEntityVertexMap;
}
@Override
diff --git a/repository/src/main/java/org/apache/atlas/discovery/FreeTextSearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/FreeTextSearchProcessor.java
index 92152ff..86f2cea 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/FreeTextSearchProcessor.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/FreeTextSearchProcessor.java
@@ -96,20 +96,23 @@ public class FreeTextSearchProcessor extends SearchProcessor {
}
try {
- final int startIdx = context.getSearchParameters().getOffset();
- final int limit = context.getSearchParameters().getLimit();
+ final int limit = context.getSearchParameters().getLimit();
+ final Integer marker = context.getMarker();
+ final int startIdx = marker != null ? marker : context.getSearchParameters().getOffset();
// query to start at 0, even though startIdx can be higher - because few results in earlier retrieval could
// have been dropped: like vertices of non-entity or non-active-entity
//
// first 'startIdx' number of entries will be ignored
- int qryOffset = 0;
+ // if marker is provided, start query with marker offset
+ int qryOffset = marker != null ? marker : 0;
int resultIdx = qryOffset;
- final List<AtlasVertex> entityVertices = new ArrayList<>();
+ LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap = new LinkedHashMap<>();
+
try {
for (; ret.size() < limit; qryOffset += limit) {
- entityVertices.clear();
+ offsetEntityVertexMap.clear();
if (context.terminateSearch()) {
LOG.warn("query terminated: {}", context.getSearchParameters());
@@ -150,22 +153,27 @@ public class FreeTextSearchProcessor extends SearchProcessor {
}
}
- entityVertices.add(vertex);
+ offsetEntityVertexMap.put((qryOffset + resultCount) - 1, vertex);
}
- isLastResultPage = resultCount < limit;
-
- super.filter(entityVertices);
+ isLastResultPage = resultCount < limit;
- resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices);
+ offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
+ resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, offsetEntityVertexMap, marker);
if (isLastResultPage) {
+ resultIdx = SearchContext.MarkerUtil.MARKER_END - 1;
break;
}
}
} catch (Throwable t) {
throw t;
}
+
+ if (marker != null) {
+ nextOffset = resultIdx + 1;
+ }
+
} finally {
AtlasPerfTracer.log(perf);
}
diff --git a/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java
index b37d93a..2d8a448 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import static org.apache.atlas.discovery.SearchContext.MATCH_ALL_NOT_CLASSIFIED;
@@ -96,21 +97,23 @@ public class FullTextSearchProcessor extends SearchProcessor {
}
try {
- final int startIdx = context.getSearchParameters().getOffset();
final int limit = context.getSearchParameters().getLimit();
final boolean activeOnly = context.getSearchParameters().getExcludeDeletedEntities();
+ final Integer marker = context.getMarker();
+ final int startIdx = marker != null ? marker : context.getSearchParameters().getOffset();
// query to start at 0, even though startIdx can be higher - because few results in earlier retrieval could
// have been dropped: like vertices of non-entity or non-active-entity
//
// first 'startIdx' number of entries will be ignored
- int qryOffset = 0;
+ // if marker is provided, start query with marker offset
+ int qryOffset = marker != null ? marker : 0;
int resultIdx = qryOffset;
- final List<AtlasVertex> entityVertices = new ArrayList<>();
+ LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap = new LinkedHashMap<>();
for (; ret.size() < limit; qryOffset += limit) {
- entityVertices.clear();
+ offsetEntityVertexMap.clear();
if (context.terminateSearch()) {
LOG.warn("query terminated: {}", context.getSearchParameters());
@@ -141,19 +144,25 @@ public class FullTextSearchProcessor extends SearchProcessor {
continue;
}
- entityVertices.add(vertex);
+ offsetEntityVertexMap.put((qryOffset + resultCount) - 1, vertex);
}
- isLastResultPage = resultCount < limit;
+ isLastResultPage = resultCount < limit;
- super.filter(entityVertices);
+ offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
- resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices);
+ resultIdx = collectResultVertices(ret,startIdx, limit, resultIdx, offsetEntityVertexMap, marker);
if (isLastResultPage) {
+ resultIdx = SearchContext.MarkerUtil.MARKER_END - 1 ;
break;
}
}
+
+ if (marker != null) {
+ nextOffset = resultIdx + 1;
+ }
+
} finally {
AtlasPerfTracer.log(perf);
}
diff --git a/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java b/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java
index aa49121..01954d0 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java
@@ -18,10 +18,10 @@
package org.apache.atlas.discovery;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.SearchParameters;
-import org.apache.atlas.model.discovery.SearchParameters.*;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.repository.Constants;
@@ -43,11 +43,25 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.atlas.discovery.SearchProcessor.ALL_TYPE_QUERY;
-import static org.apache.atlas.model.discovery.SearchParameters.*;
+import static org.apache.atlas.model.discovery.SearchParameters.ALL_CLASSIFICATIONS;
+import static org.apache.atlas.model.discovery.SearchParameters.ALL_CLASSIFICATION_TYPES;
+import static org.apache.atlas.model.discovery.SearchParameters.ALL_ENTITY_TYPES;
+import static org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
+import static org.apache.atlas.model.discovery.SearchParameters.NO_CLASSIFICATIONS;
+import static org.apache.atlas.model.discovery.SearchParameters.WILDCARD_CLASSIFICATIONS;
/*
* Search context captures elements required for performing a basic search
@@ -71,6 +85,7 @@ public class SearchContext {
private final String classificationTypeAndSubTypesQryStr;
private boolean terminateSearch = false;
private SearchProcessor searchProcessor;
+ private Integer marker;
public final static AtlasClassificationType MATCH_ALL_WILDCARD_CLASSIFICATION = new AtlasClassificationType(new AtlasClassificationDef(WILDCARD_CLASSIFICATIONS));
public final static AtlasClassificationType MATCH_ALL_CLASSIFIED = new AtlasClassificationType(new AtlasClassificationDef(ALL_CLASSIFICATIONS));
@@ -124,6 +139,10 @@ public class SearchContext {
}
}
+ if (StringUtils.isNotEmpty(searchParameters.getMarker())) {
+ marker = MarkerUtil.decodeMarker(searchParameters);
+ }
+
//remove other types if builtin type is present
filterStructTypes();
@@ -231,6 +250,8 @@ public class SearchContext {
public Set<String> getClassificationNames() {return classificationNames;}
+ public Integer getMarker() { return marker; }
+
public boolean includeEntityType(String entityType) {
return typeAndSubTypes.isEmpty() || typeAndSubTypes.contains(entityType);
}
@@ -238,9 +259,7 @@ public class SearchContext {
public boolean includeClassificationTypes(Collection<String> traitNames) {
final boolean ret;
- if (CollectionUtils.isEmpty(classificationTypes) || classificationTypeAndSubTypes.isEmpty()) {
- ret = true;
- } else if (classificationTypes.iterator().next() == MATCH_ALL_NOT_CLASSIFIED) {
+ if (classificationTypes.iterator().next() == MATCH_ALL_NOT_CLASSIFIED) {
ret = CollectionUtils.isEmpty(traitNames);
} else if (classificationTypes.iterator().next() == MATCH_ALL_CLASSIFICATION_TYPES) {
ret = CollectionUtils.isNotEmpty(traitNames);
@@ -503,4 +522,64 @@ public class SearchContext {
private AtlasEntityType getTermEntityType() {
return typeRegistry.getEntityTypeByName(TermSearchProcessor.ATLAS_GLOSSARY_TERM_ENTITY_TYPE);
}
+
+ public static class MarkerUtil {
+ private final static int IDX_HASH_CODE = 0;
+ private final static int IDX_OFFSET = 1;
+
+ private final static String MARKER_DELIMITER = ":";
+
+ @VisibleForTesting
+ final static String MARKER_START = "*";
+
+ @VisibleForTesting
+ final static int MARKER_END = -1;
+
+ public static String getNextEncMarker(SearchParameters searchParameters, Integer nextOffset) {
+ if (nextOffset == null) {
+ return null;
+ }
+
+ if (nextOffset == MARKER_END) {
+ return String.valueOf(nextOffset);
+ }
+
+ String value = searchParameters.hashCode() + MARKER_DELIMITER + nextOffset;
+ return Base64.getEncoder().encodeToString(value.getBytes());
+ }
+
+ public static Integer decodeMarker(SearchParameters searchParameters) throws AtlasBaseException {
+ if (searchParameters == null || searchParameters.getOffset() > 0) {
+ throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Marker can be used only if offset=0.");
+ }
+
+ String encodedMarker = searchParameters.getMarker();
+ if (StringUtils.equals(encodedMarker, MARKER_START)) {
+ return 0;
+ }
+
+ try {
+ byte[] inputMarkerBytes = Base64.getDecoder().decode(encodedMarker);
+ String inputMarker = new String(inputMarkerBytes);
+ if (StringUtils.isEmpty(inputMarker) || !inputMarker.contains(MARKER_DELIMITER)) {
+ throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid marker found! Marker does not contain delimiter: " + MARKER_DELIMITER);
+ }
+
+ String[] str = inputMarker.split(MARKER_DELIMITER);
+ if (str == null || str.length != 2) {
+ throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid marker found! Decoding using delimiter did not yield correct result!");
+ }
+
+ int hashCode = Integer.parseInt(str[IDX_HASH_CODE]);
+ int currentHashCode = searchParameters.hashCode();
+ if (hashCode == currentHashCode && Integer.parseInt(str[IDX_OFFSET]) >= 0) {
+ return Integer.parseInt(str[IDX_OFFSET]);
+ }
+
+ throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid Marker! Parsing resulted in error.");
+ } catch (Exception e) {
+ throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid marker!");
+ }
+ }
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/discovery/SearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/SearchProcessor.java
index f9832c3..f69dc42 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/SearchProcessor.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/SearchProcessor.java
@@ -39,6 +39,7 @@ import org.apache.atlas.util.AtlasGremlinQueryProvider;
import org.apache.atlas.util.SearchPredicateUtil;
import org.apache.atlas.util.SearchPredicateUtil.*;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.collections.Predicate;
import org.apache.commons.collections.PredicateUtils;
import org.apache.commons.lang.StringUtils;
@@ -52,6 +53,7 @@ import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.*;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import static org.apache.atlas.SortOrder.ASCENDING;
import static org.apache.atlas.discovery.SearchContext.MATCH_ALL_CLASSIFICATION_TYPES;
@@ -137,6 +139,7 @@ public abstract class SearchProcessor {
protected SearchProcessor nextProcessor;
protected Predicate inMemoryPredicate;
protected GraphIndexQueryBuilder graphIndexQueryBuilder;
+ protected Integer nextOffset;
protected SearchProcessor(SearchContext context) {
this.context = context;
@@ -151,6 +154,10 @@ public abstract class SearchProcessor {
}
}
+ public String getNextMarker() {
+ return SearchContext.MarkerUtil.getNextEncMarker(context.getSearchParameters(), nextOffset);
+ }
+
public abstract List<AtlasVertex> execute();
public abstract long getResultCount();
@@ -181,28 +188,41 @@ public abstract class SearchProcessor {
StringUtils.equals(attrName, CUSTOM_ATTRIBUTES_PROPERTY_KEY);
}
- protected int collectResultVertices(final List<AtlasVertex> ret, final int startIdx, final int limit, int resultIdx, final List<AtlasVertex> entityVertices) {
- for (AtlasVertex entityVertex : entityVertices) {
- resultIdx++;
+ protected int collectResultVertices(final List<AtlasVertex> ret, final int startIdx, final int limit, int resultIdx, final Map<Integer, AtlasVertex> offsetEntityVertexMap, Integer marker) {
+ int lastOffset = resultIdx;
- if (resultIdx <= startIdx) {
- continue;
- }
+ for (Map.Entry<Integer, AtlasVertex> offsetToEntity : offsetEntityVertexMap.entrySet()) {
+ resultIdx++;
+
+ if (resultIdx <= startIdx) {
+ continue;
+ }
- ret.add(entityVertex);
+ lastOffset = offsetToEntity.getKey();
+ ret.add(offsetToEntity.getValue());
- if (ret.size() == limit) {
- break;
+ if (ret.size() == limit) {
+ break;
+ }
}
- }
+ return marker == null ? resultIdx : lastOffset;
+ }
- return resultIdx;
+ public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap) {
+ if (nextProcessor != null && MapUtils.isNotEmpty(offsetEntityVertexMap)) {
+ return nextProcessor.filter(offsetEntityVertexMap);
+ }
+ return offsetEntityVertexMap;
}
- public void filter(List<AtlasVertex> entityVertices) {
- if (nextProcessor != null && CollectionUtils.isNotEmpty(entityVertices)) {
- nextProcessor.filter(entityVertices);
+ public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap, Predicate predicate) {
+ if (predicate != null) {
+ offsetEntityVertexMap = offsetEntityVertexMap.entrySet()
+ .stream()
+ .filter(x -> predicate.evaluate(x.getValue()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (x, y) -> y, LinkedHashMap::new));
}
+ return offsetEntityVertexMap;
}
protected Predicate buildTraitPredict(Set<AtlasClassificationType> classificationTypes) {
@@ -361,13 +381,13 @@ public abstract class SearchProcessor {
return ret;
}
- protected void filterWhiteSpaceClassification(List<AtlasVertex> entityVertices) {
- if (CollectionUtils.isNotEmpty(entityVertices)) {
- final Iterator<AtlasVertex> it = entityVertices.iterator();
- final Set<String> typeAndSubTypes = context.getClassificationTypeNames();
+ protected LinkedHashMap<Integer,AtlasVertex> filterWhiteSpaceClassification(LinkedHashMap<Integer,AtlasVertex> offsetEntityVertexMap) {
+ if (offsetEntityVertexMap != null) {
+ final Iterator<Map.Entry<Integer, AtlasVertex>> it = offsetEntityVertexMap.entrySet().iterator();
+ final Set<String> typeAndSubTypes = context.getClassificationTypeNames();
while (it.hasNext()) {
- AtlasVertex entityVertex = it.next();
+ AtlasVertex entityVertex = it.next().getValue();
List<String> classificationNames = AtlasGraphUtilsV2.getClassificationNames(entityVertex);
if (CollectionUtils.isNotEmpty(classificationNames)) {
@@ -387,6 +407,7 @@ public abstract class SearchProcessor {
it.remove();
}
}
+ return offsetEntityVertexMap;
}
protected void constructFilterQuery(StringBuilder indexQuery, Set<? extends AtlasStructType> structTypes, FilterCriteria filterCriteria, Set<String> indexAttributes) {
@@ -1205,6 +1226,18 @@ public abstract class SearchProcessor {
return vertices;
}
+ protected LinkedHashMap<Integer, AtlasVertex> getVerticesFromIndexQueryResult(Iterator<AtlasIndexQuery.Result> idxQueryResult, LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap, int qryOffset) {
+ if (idxQueryResult != null) {
+ while (idxQueryResult.hasNext()) {
+ AtlasVertex vertex = idxQueryResult.next().getVertex();
+
+ offsetEntityVertexMap.put(qryOffset++, vertex);
+ }
+ }
+
+ return offsetEntityVertexMap;
+ }
+
protected Collection<AtlasVertex> getVertices(Iterator<AtlasVertex> iterator, Collection<AtlasVertex> vertices) {
if (iterator != null) {
while (iterator.hasNext()) {
@@ -1217,6 +1250,18 @@ public abstract class SearchProcessor {
return vertices;
}
+ protected LinkedHashMap<Integer, AtlasVertex> getVertices(Iterator<AtlasVertex> iterator, LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap, int qryOffset) {
+ if (iterator != null) {
+ while (iterator.hasNext()) {
+ AtlasVertex vertex = iterator.next();
+
+ offsetEntityVertexMap.put(qryOffset++, vertex);
+ }
+ }
+
+ return offsetEntityVertexMap;
+ }
+
protected Set<String> getGuids(List<AtlasVertex> vertices) {
Set<String> ret = new HashSet<>();
diff --git a/repository/src/main/java/org/apache/atlas/discovery/TermSearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/TermSearchProcessor.java
index 45a8158..b8a507e 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/TermSearchProcessor.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/TermSearchProcessor.java
@@ -20,12 +20,15 @@ package org.apache.atlas.discovery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
-
+import java.util.Map;
+import java.util.stream.Collectors;
public class TermSearchProcessor extends SearchProcessor {
@@ -58,15 +61,22 @@ public class TermSearchProcessor extends SearchProcessor {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "TermSearchProcessor.execute(" + context + ")");
}
+ //marker functionality will not work when there is need to fetch Term vertices and get entities from it
try {
if (CollectionUtils.isNotEmpty(assignedEntities)) {
+ LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap = new LinkedHashMap<>();
+
final int startIdx = context.getSearchParameters().getOffset();
final int limit = context.getSearchParameters().getLimit();
final List<AtlasVertex> tmpList = new ArrayList<>(assignedEntities);
- super.filter(tmpList);
+ for (int i = 0; i < tmpList.size(); i++) {
+ offsetEntityVertexMap.put(i, tmpList.get(i));
+ }
+
+ offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
- collectResultVertices(ret, startIdx, limit, 0, tmpList);
+ collectResultVertices(ret, startIdx, limit, 0, offsetEntityVertexMap, null);
}
} finally {
AtlasPerfTracer.log(perf);
@@ -79,37 +89,41 @@ public class TermSearchProcessor extends SearchProcessor {
return ret;
}
+ //this filter is never used
@Override
- public void filter(List<AtlasVertex> entityVertices) {
+ public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap) {
if (LOG.isDebugEnabled()) {
- LOG.debug("==> TermSearchProcessor.filter({})", entityVertices.size());
+ LOG.debug("==> TermSearchProcessor.filter({})", offsetEntityVertexMap.size());
}
- if (CollectionUtils.isNotEmpty(entityVertices)) {
+ if (MapUtils.isNotEmpty(offsetEntityVertexMap)) {
if (CollectionUtils.isEmpty(assignedEntities)) {
- entityVertices.clear();
+ offsetEntityVertexMap.clear();
} else {
- CollectionUtils.filter(entityVertices, o -> {
- if (o instanceof AtlasVertex) {
- AtlasVertex entityVertex = (AtlasVertex) o;
-
- for (AtlasVertex assignedEntity : assignedEntities) {
- if (assignedEntity.getId().equals(entityVertex.getId())) {
- return true;
+ offsetEntityVertexMap.entrySet().stream().
+ filter(o -> {
+ if (o instanceof AtlasVertex) {
+ AtlasVertex entityVertex = (AtlasVertex) o;
+
+ for (AtlasVertex assignedEntity : assignedEntities) {
+ if (assignedEntity.getId().equals(entityVertex.getId())) {
+ return true;
+ }
+ }
}
- }
- }
return false;
- });
+ }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (x, y) -> y, LinkedHashMap::new));
}
}
- super.filter(entityVertices);
+ offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
if (LOG.isDebugEnabled()) {
- LOG.debug("<== TermSearchProcessor.filter(): ret.size()={}", entityVertices.size());
+ LOG.debug("<== TermSearchProcessor.filter(): ret.size()={}", offsetEntityVertexMap.size());
}
+
+ return offsetEntityVertexMap;
}
@Override
diff --git a/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java
index 027827a..a9fbd43 100644
--- a/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java
@@ -34,6 +34,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
import org.testng.Assert;
import org.testng.annotations.*;
@@ -73,67 +74,51 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
SearchParameters params = new SearchParameters();
params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY);
- List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
- Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
- assertEquals(entityHeaders.size(), 10);
+ assertSearchProcessorWithoutMarker(params, 10);
}
// TSP execute and CSP,ESP filter
@Test
- public void term_tag() throws AtlasBaseException {
+ public void termTag() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY);
params.setClassification(METRIC_CLASSIFICATION);
- List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
- Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
- for(AtlasEntityHeader e : entityHeaders){
- System.out.println(e.toString());
- }
- assertEquals(entityHeaders.size(), 4);
+ assertSearchProcessorWithoutMarker(params, 4);
}
@Test
- public void term_entity() throws AtlasBaseException {
+ public void termEntity() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY);
params.setTypeName(HIVE_TABLE_TYPE);
- List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
- Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
- assertEquals(entityHeaders.size(), 10);
+ assertSearchProcessorWithoutMarker(params, 10);
}
@Test
- public void term_entity_tag() throws AtlasBaseException {
+ public void termEntityTag() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY);
params.setTypeName(HIVE_TABLE_TYPE);
params.setClassification(DIMENSIONAL_CLASSIFICATION);
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
Assert.assertTrue(CollectionUtils.isEmpty(entityHeaders));
}
//FSP execute and CSP,ESP filter
@Test
- public void query_ALLTag() throws AtlasBaseException {
+ public void queryALLTag() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setClassification(ALL_CLASSIFICATION_TYPES);
params.setQuery("sales");
- List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
- Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
- assertEquals(entityHeaders.size(), 5);
+ assertSearchProcessorWithoutMarker(params, 5);
}
@Test
- public void query_ALLTag_tagFilter() throws AtlasBaseException {
+ public void queryALLTagTagFilter() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setClassification(ALL_CLASSIFICATION_TYPES);
//typeName will check for only classification name not propogated classification
@@ -141,103 +126,79 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
params.setTagFilters(fc);
params.setQuery("sales");
- List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
- Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
- assertEquals(entityHeaders.size(), 4);
+ assertSearchProcessorWithoutMarker(params, 4);
}
@Test
- public void query_NOTCLASSIFIEDTag() throws AtlasBaseException {
+ public void queryNOTCLASSIFIEDTag() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setClassification(NO_CLASSIFICATIONS);
params.setQuery("sales");
- List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
- Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
- assertEquals(entityHeaders.size(), 11);
+ assertSearchProcessorWithoutMarker(params, 11);
}
@Test
- public void query_ALLWildcardTag() throws AtlasBaseException {
+ public void queryALLWildcardTag() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setClassification("*");
params.setQuery("sales");
- List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
- Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
- assertEquals(entityHeaders.size(), 5);
+ assertSearchProcessorWithoutMarker(params, 5);
}
@Test
- public void query_wildcardTag() throws AtlasBaseException {
+ public void queryWildcardTag() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setClassification("Dimen*on");
params.setQuery("sales");
- List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
- Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
- assertEquals(entityHeaders.size(), 2);
+ assertSearchProcessorWithoutMarker(params, 2);
}
@Test
- public void query_tag() throws AtlasBaseException {
+ public void queryTag() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setClassification(METRIC_CLASSIFICATION);
params.setQuery("sales");
- List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
- Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
- assertEquals(entityHeaders.size(), 3);
+ assertSearchProcessorWithoutMarker(params, 3);
}
@Test
- public void query_tag_tagFilter() throws AtlasBaseException {
+ public void queryTagTagFilter() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setClassification(METRIC_CLASSIFICATION);
SearchParameters.FilterCriteria fc = getSingleFilterCondition("__timestamp", SearchParameters.Operator.LT, String.valueOf(System.currentTimeMillis()));
params.setTagFilters(fc);
params.setQuery("sales");
- List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
- Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
- assertEquals(entityHeaders.size(), 3);
+ assertSearchProcessorWithoutMarker(params, 3);
}
@Test
- public void query_entity() throws AtlasBaseException {
+ public void queryEntity() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setTypeName(HIVE_TABLE_TYPE);
params.setQuery("sales");
- List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
- Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
- assertEquals(entityHeaders.size(), 4);
+ assertSearchProcessorWithoutMarker(params, 4);
}
@Test
- public void query_entity_entityFilter() throws AtlasBaseException {
+ public void queryEntityEntityFilter() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setTypeName(HIVE_TABLE_TYPE);
SearchParameters.FilterCriteria fc = getSingleFilterCondition("tableType", Operator.NOT_NULL, "null");
params.setEntityFilters(fc);
params.setQuery("sales");
- List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
- Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
- assertEquals(entityHeaders.size(), 3);
+ assertSearchProcessorWithoutMarker(params, 3);
}
@Test
- public void query_entity_entityFilter_tag() throws AtlasBaseException {
+ public void queryEntityEntityFilterTag() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setTypeName(HIVE_TABLE_TYPE);
SearchParameters.FilterCriteria fc = getSingleFilterCondition("tableType", Operator.IS_NULL, "null");
@@ -245,14 +206,11 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
params.setClassification(DIMENSIONAL_CLASSIFICATION);
params.setQuery("sales");
- List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
- Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
- assertEquals(entityHeaders.size(), 1);
+ assertSearchProcessorWithoutMarker(params, 1);
}
@Test
- public void query_entity_entityFilter_tag_tagFilter() throws AtlasBaseException {
+ public void queryEntityEntityFilterTagTagFilter() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setTypeName(HIVE_TABLE_TYPE);
SearchParameters.FilterCriteria fcE = getSingleFilterCondition("tableType", Operator.IS_NULL, "null");
@@ -262,14 +220,11 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
SearchParameters.FilterCriteria fcC = getSingleFilterCondition("attr1", Operator.EQ, "value1");
params.setTagFilters(fcC);
- List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
- Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
- assertEquals(entityHeaders.size(), 1);
+ assertSearchProcessorWithoutMarker(params, 1);
}
@Test
- public void query_entity_tag_tagFilter() throws AtlasBaseException {
+ public void queryEntityTagTagFilter() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setTypeName(HIVE_TABLE_TYPE);
params.setClassification(METRIC_CLASSIFICATION);
@@ -277,29 +232,22 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
params.setTagFilters(fc);
params.setQuery("sales");
- List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
- Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
- assertEquals(entityHeaders.size(), 2);
-
+ assertSearchProcessorWithoutMarker(params, 2);
}
@Test
- public void query_entity_tag() throws AtlasBaseException {
+ public void queryEntityTag() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setTypeName(HIVE_TABLE_TYPE);
params.setClassification(METRIC_CLASSIFICATION);
params.setQuery("sales");
- List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
- Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
- assertEquals(entityHeaders.size(), 2);
+ assertSearchProcessorWithoutMarker(params, 2);
}
// CSP Execute and ESP filter
@Test
- public void entity_entityFilter_tag_tagFilter() throws AtlasBaseException {
+ public void entityEntityFilterTagTagFilter() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setTypeName(HIVE_TABLE_TYPE);
SearchParameters.FilterCriteria fcE = getSingleFilterCondition("tableType", Operator.EQ, "Managed");
@@ -308,27 +256,171 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
SearchParameters.FilterCriteria fcC = getSingleFilterCondition("__timestamp", SearchParameters.Operator.LT, String.valueOf(System.currentTimeMillis()));
params.setTagFilters(fcC);
- List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
- Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
- assertEquals(entityHeaders.size(), 4);
-
+ assertSearchProcessorWithoutMarker(params, 4);
}
@Test
- public void entity_tag_tagFilter() throws AtlasBaseException {
+ public void entityTagTagFilter() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setTypeName(HIVE_TABLE_TYPE);
params.setClassification(METRIC_CLASSIFICATION);
SearchParameters.FilterCriteria fc = getSingleFilterCondition("__timestamp", SearchParameters.Operator.LT, String.valueOf(System.currentTimeMillis()));
params.setTagFilters(fc);
+ assertSearchProcessorWithoutMarker(params, 4);
+ }
+
+ @Test
+ public void searchWith0offsetMarker() throws AtlasBaseException {
+ SearchParameters params = new SearchParameters();
+ params.setTypeName(HIVE_TABLE_TYPE);
+ params.setOffset(0);
+ params.setMarker(SearchContext.MarkerUtil.MARKER_START);
+ params.setLimit(5);
+
+ assertSearchProcessorWithMarker(params, 5);
+ }
+
+ @Test
+ public void searchWithNoOffsetMarker() throws AtlasBaseException {
+ SearchParameters params = new SearchParameters();
+ params.setTypeName(HIVE_TABLE_TYPE);
+ params.setMarker(SearchContext.MarkerUtil.MARKER_START);
+ params.setLimit(5);
+
+ assertSearchProcessorWithMarker(params, 5);
+ }
+
+ @Test
+ public void searchWithGreaterThan0OffsetBlankMarker() throws AtlasBaseException {
+ SearchParameters params = new SearchParameters();
+ params.setTypeName(HIVE_TABLE_TYPE);
+ params.setOffset(1);
+ params.setMarker("");
+ params.setLimit(5);
+
+ assertSearchProcessorWithoutMarker(params, 5);
+ }
+
+ @Test(expectedExceptions = AtlasBaseException.class, expectedExceptionsMessageRegExp = "Marker can be used only if offset=0.")
+ public void searchWithGreaterThan0OffsetMarker() throws AtlasBaseException {
+ SearchParameters params = new SearchParameters();
+ params.setTypeName(HIVE_TABLE_TYPE);
+ params.setOffset(1);
+ params.setMarker(SearchContext.MarkerUtil.MARKER_START);
+ params.setLimit(5);
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
+ assertNotNull(entityHeaders);
+ }
+
+ @Test
+ public void searchWithMarkerSet() throws AtlasBaseException {
+ SearchParameters params = new SearchParameters();
+ params.setTypeName(HIVE_TABLE_TYPE);
+ params.setMarker(SearchContext.MarkerUtil.MARKER_START);
+ params.setLimit(5);
+ AtlasSearchResult searchResult = discoveryService.searchWithParameters(params);
+ List<AtlasEntityHeader> entityHeaders = searchResult.getEntities();
+
+ Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
+ assertEquals(entityHeaders.size(), 5);
+ Assert.assertTrue(StringUtils.isNotEmpty(searchResult.getNextMarker()));
+
+ //get next marker and set in marker of subsequent request
+ params.setMarker(searchResult.getNextMarker());
+ AtlasSearchResult nextsearchResult = discoveryService.searchWithParameters(params);
+ List<AtlasEntityHeader> nextentityHeaders = nextsearchResult.getEntities();
+
+ Assert.assertTrue(CollectionUtils.isNotEmpty(nextentityHeaders));
+ Assert.assertTrue(StringUtils.isNotEmpty(nextsearchResult.getNextMarker()));
+
+ if (entityHeaders.size() < params.getLimit()) {
+ Assert.assertTrue(nextsearchResult.getNextMarker() == String.valueOf(-1));
+ }
+ }
+
+ @Test(expectedExceptions = AtlasBaseException.class, expectedExceptionsMessageRegExp = "Invalid marker!")
+ public void searchWithInvalidMarker() throws AtlasBaseException {
+ SearchParameters params = new SearchParameters();
+ params.setTypeName(HIVE_TABLE_TYPE);
+ params.setMarker(SearchContext.MarkerUtil.MARKER_START);
+ params.setLimit(5);
+ AtlasSearchResult searchResult = discoveryService.searchWithParameters(params);
+ List<AtlasEntityHeader> entityHeaders = searchResult.getEntities();
+
+ Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
+ assertEquals(entityHeaders.size(), 5);
+ Assert.assertTrue(StringUtils.isNotEmpty(searchResult.getNextMarker()));
+
+ //get next marker and set in marker of subsequent request
+ params.setMarker(searchResult.getNextMarker()+"abc");
+ AtlasSearchResult nextsearchResult = discoveryService.searchWithParameters(params);
+
+ }
+
+ @Test
+ public void searchWithLastPageMarker() throws AtlasBaseException {
+ SearchParameters params = new SearchParameters();
+ params.setTypeName(HIVE_TABLE_TYPE);
+ params.setExcludeDeletedEntities(true);
+ params.setMarker(SearchContext.MarkerUtil.MARKER_START);
+ params.setLimit(5);
+ AtlasSearchResult searchResult = discoveryService.searchWithParameters(params);
+ List<AtlasEntityHeader> entityHeaders = searchResult.getEntities();
+
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
- assertEquals(entityHeaders.size(), 4);
+ assertEquals(entityHeaders.size(), 5);
+ Assert.assertTrue(StringUtils.isNotEmpty(searchResult.getNextMarker()));
+
+ long maxEntities = searchResult.getApproximateCount();
+
+ //get next marker and set in marker of subsequent request
+ params.setMarker(SearchContext.MarkerUtil.MARKER_START);
+ params.setLimit((int)maxEntities + 10);
+ AtlasSearchResult nextsearchResult = discoveryService.searchWithParameters(params);
+
+ Assert.assertTrue(nextsearchResult.getNextMarker().equals("-1"));
}
+
+ @Test //marker functionality is not supported
+ public void termMarker() throws AtlasBaseException {
+ SearchParameters params = new SearchParameters();
+ params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY);
+ params.setMarker("*");
+
+ assertSearchProcessorWithoutMarker(params, 10);
+
+ }
+
+ @Test
+ public void queryEntityTagMarker() throws AtlasBaseException {
+ SearchParameters params = new SearchParameters();
+ params.setTypeName(HIVE_TABLE_TYPE);
+ params.setClassification(METRIC_CLASSIFICATION);
+ params.setQuery("sales");
+ params.setMarker("*");
+ params.setLimit(5);
+
+ assertSearchProcessorWithMarker(params, 2);
+ }
+
+ // CSP Execute and ESP filter
+ @Test
+ public void entityEntityFilterTagTagFilterMarker() throws AtlasBaseException {
+ SearchParameters params = new SearchParameters();
+ params.setTypeName(HIVE_TABLE_TYPE);
+ SearchParameters.FilterCriteria fcE = getSingleFilterCondition("tableType", Operator.EQ, "Managed");
+ params.setEntityFilters(fcE);
+ params.setClassification(METRIC_CLASSIFICATION);
+ SearchParameters.FilterCriteria fcC = getSingleFilterCondition("__timestamp", SearchParameters.Operator.LT, String.valueOf(System.currentTimeMillis()));
+ params.setTagFilters(fcC);
+ params.setMarker("*");
+ assertSearchProcessorWithoutMarker(params, 4);
+ }
+
+
String spChar1 = "default.test_dot_name";
String spChar2 = "default.test_dot_name@db.test_db";
String spChar3 = "default.test_dot_name_12.col1@db1";
@@ -794,6 +886,29 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
entityStore.addClassification(Arrays.asList(guid), new AtlasClassification(DIMENSIONAL_CLASSIFICATION, attr));
}
+
+ private void assertSearchProcessorWithoutMarker(SearchParameters params, int expected) throws AtlasBaseException {
+ assertSearchProcessor(params, expected, false);
+ }
+
+ private void assertSearchProcessorWithMarker(SearchParameters params, int expected) throws AtlasBaseException {
+ assertSearchProcessor(params, expected, true);
+ }
+
+ private void assertSearchProcessor(SearchParameters params, int expected, boolean checkMarker) throws AtlasBaseException {
+ AtlasSearchResult searchResult = discoveryService.searchWithParameters(params);
+ List<AtlasEntityHeader> entityHeaders = searchResult.getEntities();
+
+ Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
+ assertEquals(entityHeaders.size(), expected);
+
+ if (checkMarker) {
+ Assert.assertTrue(StringUtils.isNotEmpty(searchResult.getNextMarker()));
+ } else {
+ Assert.assertTrue(StringUtils.isEmpty(searchResult.getNextMarker()));
+ }
+ }
+
@AfterClass
public void teardown() throws Exception {
AtlasGraphProvider.cleanup();
diff --git a/repository/src/test/java/org/apache/atlas/discovery/ClassificationSearchProcessorTest.java b/repository/src/test/java/org/apache/atlas/discovery/ClassificationSearchProcessorTest.java
index e1ebbfc..121dca9 100644
--- a/repository/src/test/java/org/apache/atlas/discovery/ClassificationSearchProcessorTest.java
+++ b/repository/src/test/java/org/apache/atlas/discovery/ClassificationSearchProcessorTest.java
@@ -33,6 +33,7 @@ import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -263,6 +264,62 @@ public class ClassificationSearchProcessorTest extends BasicTestSetup {
}
+ @Test
+ public void searchByWildcardTagMarker() throws AtlasBaseException {
+ final String LAST_MARKER = "-1";
+ SearchParameters params = new SearchParameters();
+ params.setClassification("*");
+ int limit = 5;
+ String marker = "*";
+ params.setLimit(limit);
+
+ while (!StringUtils.equals(marker, LAST_MARKER)) {
+ params.setMarker(marker);
+ SearchContext context = new SearchContext(params, typeRegistry, graph, indexer.getVertexIndexKeys());
+ ClassificationSearchProcessor processor = new ClassificationSearchProcessor(context);
+ List<AtlasVertex> vertices = processor.execute();
+ long totalCount = vertices.size();
+ marker = processor.getNextMarker();
+
+ if (totalCount < limit) {
+ assertEquals(marker, LAST_MARKER);
+ break;
+ } else {
+ Assert.assertNotNull(marker);
+ assertEquals(vertices.size(), 5);
+ }
+ }
+ }
+
+ @Test //marker functionality is not supported in this case
+ public void searchByTagAndGraphSysFiltersMarker() throws AtlasBaseException {
+ SearchParameters params = new SearchParameters();
+ params.setClassification(DIMENSION_CLASSIFICATION);
+ FilterCriteria filterCriteria = getSingleFilterCondition("__entityStatus", Operator.EQ, "DELETED");
+ params.setTagFilters(filterCriteria);
+ params.setExcludeDeletedEntities(false);
+ params.setLimit(20);
+ params.setMarker("*");
+
+ SearchContext context = new SearchContext(params, typeRegistry, graph, indexer.getVertexIndexKeys());
+ ClassificationSearchProcessor processor = new ClassificationSearchProcessor(context);
+ List<AtlasVertex> vertices = processor.execute();
+
+ Assert.assertTrue(CollectionUtils.isNotEmpty(vertices));
+ assertEquals(vertices.size(), 1);
+ List<String> guids = vertices.stream().map(g -> {
+ try {
+ return entityRetriever.toAtlasEntityHeader(g).getGuid();
+ } catch (AtlasBaseException e) {
+ fail("Failure in mapping vertex to AtlasEntityHeader");
+ }
+ return "";
+ }).collect(Collectors.toList());
+ Assert.assertTrue(guids.contains(dimensionTagDeleteGuid));
+
+ Assert.assertNull(processor.getNextMarker());
+ }
+
private void createDimensionTaggedEntityAndDelete() throws AtlasBaseException {
AtlasEntity entityToDelete = new AtlasEntity(HIVE_TABLE_TYPE);
entityToDelete.setAttribute("name", "entity to be deleted");
diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java
index 4c7b622..e4c74a9 100644
--- a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java
@@ -197,7 +197,8 @@ public class DiscoveryREST {
@QueryParam("sortOrder") SortOrder sortOrder,
@QueryParam("excludeDeletedEntities") boolean excludeDeletedEntities,
@QueryParam("limit") int limit,
- @QueryParam("offset") int offset) throws AtlasBaseException {
+ @QueryParam("offset") int offset,
+ @QueryParam("marker") String marker) throws AtlasBaseException {
Servlets.validateQueryParamLength("typeName", typeName);
Servlets.validateQueryParamLength("classification", classification);
Servlets.validateQueryParamLength("sortBy", sortByAttribute);
@@ -220,6 +221,7 @@ public class DiscoveryREST {
searchParameters.setExcludeDeletedEntities(excludeDeletedEntities);
searchParameters.setLimit(limit);
searchParameters.setOffset(offset);
+ searchParameters.setMarker(marker);
searchParameters.setSortBy(sortByAttribute);
searchParameters.setSortOrder(sortOrder);