You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by pi...@apache.org on 2021/05/25 16:45:07 UTC

[atlas] branch branch-2.0 updated: ATLAS-4254 : Basic Search : Optimize pagination

This is an automated email from the ASF dual-hosted git repository.

pinal pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 5086968  ATLAS-4254 : Basic Search : Optimize pagination
5086968 is described below

commit 50869680e4cc5c27d80738101f2a122c52b0000f
Author: Pinal <pinal-shah>
AuthorDate: Mon May 10 21:55:10 2021 +0530

    ATLAS-4254 : Basic Search : Optimize pagination
    
    Signed-off-by: Pinal <pinal-shah>
---
 .../atlas/model/discovery/AtlasSearchResult.java   |  11 +-
 .../atlas/model/discovery/SearchParameters.java    |  20 +-
 .../discovery/ClassificationSearchProcessor.java   |  81 ++++--
 .../atlas/discovery/EntityDiscoveryService.java    |   5 +
 .../atlas/discovery/EntitySearchProcessor.java     |  79 +++---
 .../atlas/discovery/FreeTextSearchProcessor.java   |  28 +-
 .../atlas/discovery/FullTextSearchProcessor.java   |  25 +-
 .../org/apache/atlas/discovery/SearchContext.java  |  91 ++++++-
 .../apache/atlas/discovery/SearchProcessor.java    |  83 ++++--
 .../atlas/discovery/TermSearchProcessor.java       |  52 ++--
 .../atlas/discovery/AtlasDiscoveryServiceTest.java | 301 ++++++++++++++-------
 .../ClassificationSearchProcessorTest.java         |  57 ++++
 .../org/apache/atlas/web/rest/DiscoveryREST.java   |   4 +-
 13 files changed, 603 insertions(+), 234 deletions(-)

diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
index e1c550e..ce0f84b 100644
--- a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
+++ b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
@@ -53,6 +53,7 @@ public class AtlasSearchResult implements Serializable {
     private List<AtlasFullTextResult>      fullTextResult;
     private Map<String, AtlasEntityHeader> referredEntities;
     private long                           approximateCount = -1;
+    private String                         nextMarker;
 
     public AtlasSearchResult() {}
 
@@ -131,8 +132,12 @@ public class AtlasSearchResult implements Serializable {
 
     public void setApproximateCount(long approximateCount) { this.approximateCount = approximateCount; }
 
+    public String getNextMarker() { return nextMarker; }
+
+    public void setNextMarker(String nextMarker) { this.nextMarker = nextMarker; }
+
     @Override
-    public int hashCode() { return Objects.hash(queryType, searchParameters, queryText, type, classification, entities, attributes, fullTextResult, referredEntities); }
+    public int hashCode() { return Objects.hash(queryType, searchParameters, queryText, type, classification, entities, attributes, fullTextResult, referredEntities, nextMarker); }
 
     @Override
     public boolean equals(Object o) {
@@ -147,7 +152,8 @@ public class AtlasSearchResult implements Serializable {
                Objects.equals(entities, that.entities) &&
                Objects.equals(attributes, that.attributes) &&
                Objects.equals(fullTextResult, that.fullTextResult) &&
-               Objects.equals(referredEntities, that.referredEntities);
+               Objects.equals(referredEntities, that.referredEntities) &&
+               Objects.equals(nextMarker, that.nextMarker);
     }
 
     public void addEntity(AtlasEntityHeader newEntity) {
@@ -190,6 +196,7 @@ public class AtlasSearchResult implements Serializable {
                 ", fullTextResult=" + fullTextResult +
                 ", referredEntities=" + referredEntities +
                 ", approximateCount=" + approximateCount +
+                ", nextMarker=" + nextMarker +
                 '}';
     }
 
diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java b/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java
index 9d2cd4f..78fb4a4 100644
--- a/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java
+++ b/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java
@@ -52,6 +52,7 @@ public class SearchParameters implements Serializable {
     private boolean includeSubClassifications       = true;
     private int     limit;
     private int     offset;
+    private String  marker;
 
     private FilterCriteria entityFilters;
     private FilterCriteria tagFilters;
@@ -216,6 +217,16 @@ public class SearchParameters implements Serializable {
     }
 
     /**
+     * @return marker (offset of the next page)
+     */
+    public String getMarker() { return marker; }
+
+    /**
+     * @param marker
+     */
+    public void setMarker(String marker) { this.marker = marker; }
+
+    /**
      * Entity attribute filters for the type (if type name is specified)
      * @return
      */
@@ -294,6 +305,8 @@ public class SearchParameters implements Serializable {
         SearchParameters that = (SearchParameters) o;
         return excludeDeletedEntities == that.excludeDeletedEntities &&
                 includeClassificationAttributes == that.includeClassificationAttributes &&
+                includeSubTypes == that.includeSubTypes &&
+                includeSubClassifications == that.includeSubClassifications &&
                 limit == that.limit &&
                 offset == that.offset &&
                 Objects.equals(query, that.query) &&
@@ -309,8 +322,9 @@ public class SearchParameters implements Serializable {
 
     @Override
     public int hashCode() {
-        return Objects.hash(query, typeName, classification, termName, excludeDeletedEntities, includeClassificationAttributes,
-                            limit, offset, entityFilters, tagFilters, attributes, sortBy, sortOrder);
+        return Objects.hash(query, typeName, classification, termName, includeSubTypes, includeSubClassifications,
+                            excludeDeletedEntities, includeClassificationAttributes, limit, offset, entityFilters,
+                            tagFilters, attributes, sortBy, sortOrder);
     }
 
     public StringBuilder toString(StringBuilder sb) {
@@ -323,6 +337,8 @@ public class SearchParameters implements Serializable {
         sb.append(", typeName='").append(typeName).append('\'');
         sb.append(", classification='").append(classification).append('\'');
         sb.append(", termName='").append(termName).append('\'');
+        sb.append(", includeSubTypes='").append(includeSubTypes).append('\'');
+        sb.append(", includeSubClassifications='").append(includeSubClassifications).append('\'');
         sb.append(", excludeDeletedEntities=").append(excludeDeletedEntities);
         sb.append(", includeClassificationAttributes=").append(includeClassificationAttributes);
         sb.append(", limit=").append(limit);
diff --git a/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java
index 647ff9c..dfcc441 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java
@@ -27,6 +27,7 @@ import org.apache.atlas.type.AtlasClassificationType;
 import org.apache.atlas.util.SearchPredicateUtil;
 import org.apache.atlas.utils.AtlasPerfTracer;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.collections.Predicate;
 import org.apache.commons.collections.PredicateUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -212,26 +213,32 @@ public class ClassificationSearchProcessor extends SearchProcessor {
         }
 
         try {
-            final int     startIdx   = context.getSearchParameters().getOffset();
             final int     limit      = context.getSearchParameters().getLimit();
+                  Integer marker     = context.getMarker();
 
             // query to start at 0, even though startIdx can be higher - because few results in earlier retrieval could
             // have been dropped: like non-active-entities or duplicate-entities (same entity pointed to by multiple
             // classifications in the result)
             //
             // first 'startIdx' number of entries will be ignored
-            int qryOffset = 0;
+            //marker functionality will not work when there is need to fetch classificationVertices and get entities from it
+            if (indexQuery == null) {
+                marker = null;
+            }
+            // if marker is provided, start query with marker offset
+            int startIdx  = marker != null ? marker : context.getSearchParameters().getOffset();
+            int qryOffset = marker != null ? marker : 0;
             int resultIdx = qryOffset;
 
-            final Set<String>       processedGuids         = new HashSet<>();
-            final List<AtlasVertex> entityVertices         = new ArrayList<>();
-            final List<AtlasVertex> classificationVertices = new ArrayList<>();
+            final Set<String>                   processedGuids         = new HashSet<>();
+            LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap  = new LinkedHashMap<>();
+            final List<AtlasVertex>             classificationVertices = new ArrayList<>();
 
             final String          sortBy                = context.getSearchParameters().getSortBy();
             final SortOrder       sortOrder             = context.getSearchParameters().getSortOrder();
 
             for (; ret.size() < limit; qryOffset += limit) {
-                entityVertices.clear();
+                offsetEntityVertexMap.clear();
                 classificationVertices.clear();
 
                 if (context.terminateSearch()) {
@@ -251,12 +258,12 @@ public class ClassificationSearchProcessor extends SearchProcessor {
                         queryResult = indexQuery.vertices(qryOffset, limit);
                     }
 
-                    getVerticesFromIndexQueryResult(queryResult, entityVertices);
-                    isLastResultPage = entityVertices.size() < limit;
+                    offsetEntityVertexMap   = getVerticesFromIndexQueryResult(queryResult, offsetEntityVertexMap, qryOffset);
+                    isLastResultPage        = offsetEntityVertexMap.size() < limit;
 
                     // Do in-memory filtering
-                    CollectionUtils.filter(entityVertices, traitPredicate);
-                    CollectionUtils.filter(entityVertices, isEntityPredicate);
+                    offsetEntityVertexMap = super.filter(offsetEntityVertexMap, traitPredicate);
+                    offsetEntityVertexMap = super.filter(offsetEntityVertexMap, isEntityPredicate);
 
                 } else {
                     if (classificationIndexQuery != null) {
@@ -283,11 +290,14 @@ public class ClassificationSearchProcessor extends SearchProcessor {
                 // Since tag filters are present, we need to collect the entity vertices after filtering the classification
                 // vertex results (as these might be lower in number)
                 if (CollectionUtils.isNotEmpty(classificationVertices)) {
+                    int resultCount = 0;
+
                     for (AtlasVertex classificationVertex : classificationVertices) {
                         Iterable<AtlasEdge> edges = classificationVertex.getEdges(AtlasEdgeDirection.IN, Constants.CLASSIFICATION_LABEL);
 
                         for (AtlasEdge edge : edges) {
                             AtlasVertex entityVertex = edge.getOutVertex();
+                            resultCount++;
 
                             String guid = AtlasGraphUtilsV2.getIdFromVertex(entityVertex);
 
@@ -295,7 +305,7 @@ public class ClassificationSearchProcessor extends SearchProcessor {
                                 continue;
                             }
 
-                            entityVertices.add(entityVertex);
+                            offsetEntityVertexMap.put((qryOffset + resultCount) - 1, entityVertex);
 
                             processedGuids.add(guid);
                         }
@@ -303,22 +313,28 @@ public class ClassificationSearchProcessor extends SearchProcessor {
                 }
 
                 if (whiteSpaceFilter) {
-                    filterWhiteSpaceClassification(entityVertices);
+                    offsetEntityVertexMap = filterWhiteSpaceClassification(offsetEntityVertexMap);
                 }
-                    // Do in-memory filtering
-                CollectionUtils.filter(entityVertices, isEntityPredicate);
+                // Do in-memory filtering
+                offsetEntityVertexMap = super.filter(offsetEntityVertexMap, isEntityPredicate);
                 if (activePredicate != null) {
-                    CollectionUtils.filter(entityVertices, activePredicate);
+                    offsetEntityVertexMap = super.filter(offsetEntityVertexMap, activePredicate);
                 }
 
-                super.filter(entityVertices);
+                offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
 
-                resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices);
+                resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, offsetEntityVertexMap, marker);
 
                 if (isLastResultPage) {
+                    resultIdx = SearchContext.MarkerUtil.MARKER_END - 1;
                     break;
                 }
             }
+
+            if (marker != null) {
+                nextOffset = resultIdx + 1;
+            }
+
         } finally {
             AtlasPerfTracer.log(perf);
         }
@@ -331,20 +347,23 @@ public class ClassificationSearchProcessor extends SearchProcessor {
     }
 
     @Override
-    public void filter(List<AtlasVertex> entityVertices) {
+    public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("==> ClassificationSearchProcessor.filter({})", entityVertices.size());
+            LOG.debug("==> ClassificationSearchProcessor.filter({})", offsetEntityVertexMap.size());
         }
 
         if (inMemoryPredicate != null) {
             //in case of classification type + index attributes
-            CollectionUtils.filter(entityVertices, traitPredicate);
+            offsetEntityVertexMap = super.filter(offsetEntityVertexMap, traitPredicate);
 
             //filter attributes (filterCriteria). Find classification vertex(typeName = classification) from entity vertex (traitName = classification)
             final Set<String> processedGuids = new HashSet<>();
-            List<AtlasVertex> matchEntityVertices = new ArrayList<>();
-            if (CollectionUtils.isNotEmpty(entityVertices)) {
-                for (AtlasVertex entityVertex : entityVertices) {
+            LinkedHashMap<Integer, AtlasVertex> matchEntityVertices = new LinkedHashMap<>();
+
+            if (MapUtils.isNotEmpty(offsetEntityVertexMap)) {
+                for (Map.Entry<Integer, AtlasVertex> offsetToEntity : offsetEntityVertexMap.entrySet()) {
+
+                    AtlasVertex entityVertex  = offsetToEntity.getValue();
                     Iterable<AtlasEdge> edges = entityVertex.getEdges(AtlasEdgeDirection.OUT, Constants.CLASSIFICATION_LABEL);
 
                     for (AtlasEdge edge : edges) {
@@ -358,7 +377,7 @@ public class ClassificationSearchProcessor extends SearchProcessor {
                                 continue;
                             }
 
-                            matchEntityVertices.add(entityVertex);
+                            matchEntityVertices.put(offsetToEntity.getKey(), entityVertex);
                             processedGuids.add(guid);
                             break;
 
@@ -366,20 +385,22 @@ public class ClassificationSearchProcessor extends SearchProcessor {
                     }
                 }
             }
-            entityVertices.clear();
-            entityVertices.addAll(matchEntityVertices);
+            offsetEntityVertexMap.clear();
+            offsetEntityVertexMap.putAll(matchEntityVertices);
 
         } else {
             //in case of only classsification type
-            CollectionUtils.filter(entityVertices, traitPredicate);
-            CollectionUtils.filter(entityVertices, isEntityPredicate);
+            offsetEntityVertexMap = super.filter(offsetEntityVertexMap, traitPredicate);
+            offsetEntityVertexMap = super.filter(offsetEntityVertexMap, isEntityPredicate);
         }
 
-        super.filter(entityVertices);
+        offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("<== ClassificationSearchProcessor.filter(): ret.size()={}", entityVertices.size());
+            LOG.debug("<== ClassificationSearchProcessor.filter(): ret.size()={}", offsetEntityVertexMap.size());
         }
+
+        return offsetEntityVertexMap;
     }
 
     @Override
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
index a3ab6e3..f2290c6 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
@@ -463,6 +463,11 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
 
             ret.setApproximateCount(searchContext.getSearchProcessor().getResultCount());
 
+            String nextMarker = searchContext.getSearchProcessor().getNextMarker();
+            if (StringUtils.isNotEmpty(nextMarker)) {
+                ret.setNextMarker(nextMarker);
+            }
+
             // By default any attribute that shows up in the search parameter should be sent back in the response
             // If additional values are requested then the entityAttributes will be a superset of the all search attributes
             // and the explicitly requested attribute(s)
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java
index 5dcff3b..f45ccaf 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java
@@ -240,32 +240,26 @@ public class EntitySearchProcessor extends SearchProcessor {
         }
 
         try {
-            final int startIdx = context.getSearchParameters().getOffset();
-            final int limit    = context.getSearchParameters().getLimit();
+            final int limit       = context.getSearchParameters().getLimit();
+            final Integer marker  = context.getMarker();
+            final int startIdx    = marker != null ? marker : context.getSearchParameters().getOffset();
 
             // when subsequent filtering stages are involved, query should start at 0 even though startIdx can be higher
             //
             // first 'startIdx' number of entries will be ignored
-            int qryOffset = (nextProcessor != null || (graphQuery != null && indexQuery != null)) ? 0 : startIdx;
-            int resultIdx = qryOffset;
-
-            final List<AtlasVertex> entityVertices = new ArrayList<>();
-
-            SortOrder sortOrder = context.getSearchParameters().getSortOrder();
-            String sortBy = context.getSearchParameters().getSortBy();
-
-            final AtlasEntityType entityType = context.getEntityTypes().iterator().next();
-            AtlasAttribute sortByAttribute = entityType.getAttribute(sortBy);
-            if (sortByAttribute == null) {
-                sortBy = null;
+            // if marker is provided, start query with marker offset
+            int qryOffset;
+            if (marker != null) {
+                qryOffset = marker;
             } else {
-                sortBy = sortByAttribute.getVertexPropertyName();
+                qryOffset = (nextProcessor != null || (graphQuery != null && indexQuery != null)) ? 0 : startIdx;
             }
+            int resultIdx = qryOffset;
 
-            if (sortOrder == null) { sortOrder = ASCENDING; }
+            LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap = new LinkedHashMap<>();
 
             for (; ret.size() < limit; qryOffset += limit) {
-                entityVertices.clear();
+                offsetEntityVertexMap.clear();
 
                 if (context.terminateSearch()) {
                     LOG.warn("query terminated: {}", context.getSearchParameters());
@@ -277,41 +271,36 @@ public class EntitySearchProcessor extends SearchProcessor {
 
                 if (indexQuery != null) {
                     Iterator<AtlasIndexQuery.Result> idxQueryResult = executeIndexQuery(context, indexQuery, qryOffset, limit);
+                    offsetEntityVertexMap = getVerticesFromIndexQueryResult(idxQueryResult, offsetEntityVertexMap, qryOffset);
 
-                    getVerticesFromIndexQueryResult(idxQueryResult, entityVertices);
-
-                    isLastResultPage = entityVertices.size() < limit;
-
-                    // Do in-memory filtering before the graph query
-                    CollectionUtils.filter(entityVertices, inMemoryPredicate);
-
-                    if (graphQueryPredicate != null) {
-                        CollectionUtils.filter(entityVertices, graphQueryPredicate);
-                    }
                 } else {
                     Iterator<AtlasVertex> queryResult = graphQuery.vertices(qryOffset, limit).iterator();
+                    offsetEntityVertexMap = getVertices(queryResult, offsetEntityVertexMap, qryOffset);
+                }
 
-                    getVertices(queryResult, entityVertices);
-
-                    isLastResultPage = entityVertices.size() < limit;
-
-                    // Do in-memory filtering
-                    CollectionUtils.filter(entityVertices, inMemoryPredicate);
+                isLastResultPage = offsetEntityVertexMap.size() < limit;
+                // Do in-memory filtering
+                offsetEntityVertexMap = super.filter(offsetEntityVertexMap, inMemoryPredicate);
 
-                    //incase when operator is NEQ in pipeSeperatedSystemAttributes
-                    if (graphQueryPredicate != null) {
-                        CollectionUtils.filter(entityVertices, graphQueryPredicate);
-                    }
+                //incase when operator is NEQ in pipeSeperatedSystemAttributes
+                if (graphQueryPredicate != null) {
+                    offsetEntityVertexMap = super.filter(offsetEntityVertexMap, graphQueryPredicate);
                 }
 
-                super.filter(entityVertices);
+                offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
 
-                resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices);
+                resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, offsetEntityVertexMap, marker);
 
                 if (isLastResultPage) {
+                    resultIdx = MarkerUtil.MARKER_END - 1;
                     break;
                 }
             }
+
+            if (marker != null) {
+                nextOffset = resultIdx + 1;
+            }
+
         } finally {
             AtlasPerfTracer.log(perf);
         }
@@ -324,23 +313,25 @@ public class EntitySearchProcessor extends SearchProcessor {
     }
 
     @Override
-    public void filter(List<AtlasVertex> entityVertices) {
+    public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer,AtlasVertex> offsetEntityVertexMap) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("==> EntitySearchProcessor.filter({})", entityVertices.size());
+            LOG.debug("==> EntitySearchProcessor.filter({})", offsetEntityVertexMap.size());
         }
 
         // Since we already have the entity vertices, a in-memory filter will be faster than fetching the same
         // vertices again with the required filtering
         if (filterGraphQueryPredicate != null) {
             LOG.debug("Filtering in-memory");
-            CollectionUtils.filter(entityVertices, filterGraphQueryPredicate);
+            offsetEntityVertexMap = super.filter(offsetEntityVertexMap, filterGraphQueryPredicate);
         }
 
-        super.filter(entityVertices);
+        offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("<== EntitySearchProcessor.filter(): ret.size()={}", entityVertices.size());
+            LOG.debug("<== EntitySearchProcessor.filter(): ret.size()={}", offsetEntityVertexMap.size());
         }
+
+        return offsetEntityVertexMap;
     }
 
     @Override
diff --git a/repository/src/main/java/org/apache/atlas/discovery/FreeTextSearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/FreeTextSearchProcessor.java
index 92152ff..86f2cea 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/FreeTextSearchProcessor.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/FreeTextSearchProcessor.java
@@ -96,20 +96,23 @@ public class FreeTextSearchProcessor extends SearchProcessor {
         }
 
         try {
-            final int startIdx = context.getSearchParameters().getOffset();
-            final int limit    = context.getSearchParameters().getLimit();
+            final int limit       = context.getSearchParameters().getLimit();
+            final Integer marker  = context.getMarker();
+            final int startIdx    = marker != null ? marker : context.getSearchParameters().getOffset();
 
             // query to start at 0, even though startIdx can be higher - because few results in earlier retrieval could
             // have been dropped: like vertices of non-entity or non-active-entity
             //
             // first 'startIdx' number of entries will be ignored
-            int qryOffset = 0;
+            // if marker is provided, start query with marker offset
+            int qryOffset = marker != null ? marker : 0;
             int resultIdx = qryOffset;
 
-            final List<AtlasVertex> entityVertices = new ArrayList<>();
+            LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap = new LinkedHashMap<>();
+
             try {
                 for (; ret.size() < limit; qryOffset += limit) {
-                    entityVertices.clear();
+                    offsetEntityVertexMap.clear();
 
                     if (context.terminateSearch()) {
                         LOG.warn("query terminated: {}", context.getSearchParameters());
@@ -150,22 +153,27 @@ public class FreeTextSearchProcessor extends SearchProcessor {
                             }
                         }
 
-                        entityVertices.add(vertex);
+                        offsetEntityVertexMap.put((qryOffset + resultCount) - 1, vertex);
                     }
 
-                    isLastResultPage = resultCount < limit;
-
-                    super.filter(entityVertices);
+                    isLastResultPage      = resultCount < limit;
 
-                    resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices);
+                    offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
+                    resultIdx             = collectResultVertices(ret, startIdx, limit, resultIdx, offsetEntityVertexMap, marker);
 
                     if (isLastResultPage) {
+                        resultIdx = SearchContext.MarkerUtil.MARKER_END - 1;
                         break;
                     }
                 }
             } catch (Throwable t) {
                 throw t;
             }
+
+            if (marker != null) {
+                nextOffset = resultIdx + 1;
+            }
+
         } finally {
             AtlasPerfTracer.log(perf);
         }
diff --git a/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java
index b37d93a..2d8a448 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 
 import static org.apache.atlas.discovery.SearchContext.MATCH_ALL_NOT_CLASSIFIED;
@@ -96,21 +97,23 @@ public class FullTextSearchProcessor extends SearchProcessor {
         }
 
         try {
-            final int     startIdx   = context.getSearchParameters().getOffset();
             final int     limit      = context.getSearchParameters().getLimit();
             final boolean activeOnly = context.getSearchParameters().getExcludeDeletedEntities();
+            final Integer marker     = context.getMarker();
+            final int     startIdx   = marker != null ? marker : context.getSearchParameters().getOffset();
 
             // query to start at 0, even though startIdx can be higher - because few results in earlier retrieval could
             // have been dropped: like vertices of non-entity or non-active-entity
             //
             // first 'startIdx' number of entries will be ignored
-            int qryOffset = 0;
+            // if marker is provided, start query with marker offset
+            int qryOffset = marker != null ? marker : 0;
             int resultIdx = qryOffset;
 
-            final List<AtlasVertex> entityVertices = new ArrayList<>();
+            LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap = new LinkedHashMap<>();
 
             for (; ret.size() < limit; qryOffset += limit) {
-                entityVertices.clear();
+                offsetEntityVertexMap.clear();
 
                 if (context.terminateSearch()) {
                     LOG.warn("query terminated: {}", context.getSearchParameters());
@@ -141,19 +144,25 @@ public class FullTextSearchProcessor extends SearchProcessor {
                         continue;
                     }
 
-                    entityVertices.add(vertex);
+                    offsetEntityVertexMap.put((qryOffset + resultCount) - 1, vertex);
                 }
 
-                isLastResultPage = resultCount < limit;
+                isLastResultPage      = resultCount < limit;
 
-                super.filter(entityVertices);
+                offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
 
-                resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices);
+                resultIdx             = collectResultVertices(ret,startIdx, limit, resultIdx, offsetEntityVertexMap, marker);
 
                 if (isLastResultPage) {
+                    resultIdx         = SearchContext.MarkerUtil.MARKER_END - 1 ;
                     break;
                 }
             }
+
+            if (marker != null) {
+                nextOffset = resultIdx + 1;
+            }
+
         } finally {
             AtlasPerfTracer.log(perf);
         }
diff --git a/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java b/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java
index aa49121..01954d0 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java
@@ -18,10 +18,10 @@
 package org.apache.atlas.discovery;
 
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.discovery.SearchParameters;
-import org.apache.atlas.model.discovery.SearchParameters.*;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.typedef.AtlasClassificationDef;
 import org.apache.atlas.repository.Constants;
@@ -43,11 +43,25 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.atlas.discovery.SearchProcessor.ALL_TYPE_QUERY;
-import static org.apache.atlas.model.discovery.SearchParameters.*;
+import static org.apache.atlas.model.discovery.SearchParameters.ALL_CLASSIFICATIONS;
+import static org.apache.atlas.model.discovery.SearchParameters.ALL_CLASSIFICATION_TYPES;
+import static org.apache.atlas.model.discovery.SearchParameters.ALL_ENTITY_TYPES;
+import static org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
+import static org.apache.atlas.model.discovery.SearchParameters.NO_CLASSIFICATIONS;
+import static org.apache.atlas.model.discovery.SearchParameters.WILDCARD_CLASSIFICATIONS;
 
 /*
  * Search context captures elements required for performing a basic search
@@ -71,6 +85,7 @@ public class SearchContext {
     private final String                  classificationTypeAndSubTypesQryStr;
     private boolean                       terminateSearch = false;
     private SearchProcessor               searchProcessor;
+    private Integer                       marker;
 
     public final static AtlasClassificationType MATCH_ALL_WILDCARD_CLASSIFICATION = new AtlasClassificationType(new AtlasClassificationDef(WILDCARD_CLASSIFICATIONS));
     public final static AtlasClassificationType MATCH_ALL_CLASSIFIED              = new AtlasClassificationType(new AtlasClassificationDef(ALL_CLASSIFICATIONS));
@@ -124,6 +139,10 @@ public class SearchContext {
             }
         }
 
+        if (StringUtils.isNotEmpty(searchParameters.getMarker())) {
+            marker = MarkerUtil.decodeMarker(searchParameters);
+        }
+
         //remove other types if builtin type is present
         filterStructTypes();
 
@@ -231,6 +250,8 @@ public class SearchContext {
 
     public Set<String> getClassificationNames() {return classificationNames;}
 
+    public Integer getMarker() { return marker; }
+
     public boolean includeEntityType(String entityType) {
         return typeAndSubTypes.isEmpty() || typeAndSubTypes.contains(entityType);
     }
@@ -238,9 +259,7 @@ public class SearchContext {
     public boolean includeClassificationTypes(Collection<String> traitNames) {
         final boolean ret;
 
-        if (CollectionUtils.isEmpty(classificationTypes) || classificationTypeAndSubTypes.isEmpty()) {
-            ret = true;
-        } else if (classificationTypes.iterator().next() == MATCH_ALL_NOT_CLASSIFIED) {
+        if (classificationTypes.iterator().next() == MATCH_ALL_NOT_CLASSIFIED) {
             ret = CollectionUtils.isEmpty(traitNames);
         } else if (classificationTypes.iterator().next() == MATCH_ALL_CLASSIFICATION_TYPES) {
             ret = CollectionUtils.isNotEmpty(traitNames);
@@ -503,4 +522,64 @@ public class SearchContext {
     private AtlasEntityType getTermEntityType() {
         return typeRegistry.getEntityTypeByName(TermSearchProcessor.ATLAS_GLOSSARY_TERM_ENTITY_TYPE);
     }
+
+    public static class MarkerUtil {
+        private final static int IDX_HASH_CODE = 0;
+        private final static int IDX_OFFSET    = 1;
+
+        private final static String MARKER_DELIMITER = ":";
+
+        @VisibleForTesting
+                final static String MARKER_START     = "*";
+
+        @VisibleForTesting
+                final static int    MARKER_END       = -1;
+
+        public static String getNextEncMarker(SearchParameters searchParameters, Integer nextOffset) {
+            if (nextOffset == null) {
+                return null;
+            }
+
+            if (nextOffset == MARKER_END) {
+                return String.valueOf(nextOffset);
+            }
+
+            String value = searchParameters.hashCode() + MARKER_DELIMITER + nextOffset;
+            return Base64.getEncoder().encodeToString(value.getBytes());
+        }
+
+        public static Integer decodeMarker(SearchParameters searchParameters) throws AtlasBaseException {
+            if (searchParameters == null || searchParameters.getOffset() > 0) {
+                throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Marker can be used only if offset=0.");
+            }
+
+            String encodedMarker = searchParameters.getMarker();
+            if (StringUtils.equals(encodedMarker, MARKER_START)) {
+                return 0;
+            }
+
+            try {
+                byte[] inputMarkerBytes = Base64.getDecoder().decode(encodedMarker);
+                String inputMarker      = new String(inputMarkerBytes);
+                if (StringUtils.isEmpty(inputMarker) || !inputMarker.contains(MARKER_DELIMITER)) {
+                    throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid marker found! Marker does not contain delimiter: " + MARKER_DELIMITER);
+                }
+
+                String[] str = inputMarker.split(MARKER_DELIMITER);
+                if (str == null || str.length != 2) {
+                    throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid marker found! Decoding using delimiter did not yield correct result!");
+                }
+
+                int hashCode        = Integer.parseInt(str[IDX_HASH_CODE]);
+                int currentHashCode = searchParameters.hashCode();
+                if (hashCode == currentHashCode && Integer.parseInt(str[IDX_OFFSET]) >= 0) {
+                    return Integer.parseInt(str[IDX_OFFSET]);
+                }
+
+                throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid Marker! Parsing resulted in error.");
+            } catch (Exception e) {
+                throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid marker!");
+            }
+        }
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/discovery/SearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/SearchProcessor.java
index f9832c3..f69dc42 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/SearchProcessor.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/SearchProcessor.java
@@ -39,6 +39,7 @@ import org.apache.atlas.util.AtlasGremlinQueryProvider;
 import org.apache.atlas.util.SearchPredicateUtil;
 import org.apache.atlas.util.SearchPredicateUtil.*;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.collections.Predicate;
 import org.apache.commons.collections.PredicateUtils;
 import org.apache.commons.lang.StringUtils;
@@ -52,6 +53,7 @@ import java.sql.Timestamp;
 import java.time.LocalDateTime;
 import java.util.*;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import static org.apache.atlas.SortOrder.ASCENDING;
 import static org.apache.atlas.discovery.SearchContext.MATCH_ALL_CLASSIFICATION_TYPES;
@@ -137,6 +139,7 @@ public abstract class SearchProcessor {
     protected       SearchProcessor        nextProcessor;
     protected       Predicate              inMemoryPredicate;
     protected       GraphIndexQueryBuilder graphIndexQueryBuilder;
+    protected       Integer                nextOffset;
 
     protected SearchProcessor(SearchContext context) {
         this.context = context;
@@ -151,6 +154,10 @@ public abstract class SearchProcessor {
         }
     }
 
+    public String getNextMarker() {
+        return SearchContext.MarkerUtil.getNextEncMarker(context.getSearchParameters(), nextOffset);
+    }
+
     public abstract List<AtlasVertex> execute();
     public abstract long getResultCount();
 
@@ -181,28 +188,41 @@ public abstract class SearchProcessor {
                StringUtils.equals(attrName, CUSTOM_ATTRIBUTES_PROPERTY_KEY);
     }
 
-    protected int collectResultVertices(final List<AtlasVertex> ret, final int startIdx, final int limit, int resultIdx, final List<AtlasVertex> entityVertices) {
-        for (AtlasVertex entityVertex : entityVertices) {
-            resultIdx++;
+    protected int collectResultVertices(final List<AtlasVertex> ret, final int startIdx, final int limit, int resultIdx, final Map<Integer, AtlasVertex> offsetEntityVertexMap, Integer marker) {
+            int lastOffset = resultIdx;
 
-            if (resultIdx <= startIdx) {
-                continue;
-            }
+            for (Map.Entry<Integer, AtlasVertex> offsetToEntity : offsetEntityVertexMap.entrySet()) {
+                resultIdx++;
+
+                if (resultIdx <= startIdx) {
+                    continue;
+                }
 
-            ret.add(entityVertex);
+                lastOffset = offsetToEntity.getKey();
+                ret.add(offsetToEntity.getValue());
 
-            if (ret.size() == limit) {
-                break;
+                if (ret.size() == limit) {
+                    break;
+                }
             }
-        }
+            return marker == null ? resultIdx : lastOffset;
+    }
 
-        return resultIdx;
+    public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap) {
+        if (nextProcessor != null && MapUtils.isNotEmpty(offsetEntityVertexMap)) {
+            return nextProcessor.filter(offsetEntityVertexMap);
+        }
+        return offsetEntityVertexMap;
     }
 
-    public void filter(List<AtlasVertex> entityVertices) {
-        if (nextProcessor != null && CollectionUtils.isNotEmpty(entityVertices)) {
-            nextProcessor.filter(entityVertices);
+    public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap, Predicate predicate) {
+        if (predicate != null) {
+            offsetEntityVertexMap = offsetEntityVertexMap.entrySet()
+                    .stream()
+                    .filter(x -> predicate.evaluate(x.getValue()))
+                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (x, y) -> y, LinkedHashMap::new));
         }
+        return offsetEntityVertexMap;
     }
 
     protected Predicate buildTraitPredict(Set<AtlasClassificationType> classificationTypes) {
@@ -361,13 +381,13 @@ public abstract class SearchProcessor {
         return ret;
     }
 
-    protected void filterWhiteSpaceClassification(List<AtlasVertex> entityVertices) {
-        if (CollectionUtils.isNotEmpty(entityVertices)) {
-            final Iterator<AtlasVertex> it              = entityVertices.iterator();
-            final Set<String>           typeAndSubTypes = context.getClassificationTypeNames();
+    protected LinkedHashMap<Integer,AtlasVertex> filterWhiteSpaceClassification(LinkedHashMap<Integer,AtlasVertex> offsetEntityVertexMap) {
+        if (offsetEntityVertexMap != null) {
+            final Iterator<Map.Entry<Integer, AtlasVertex>> it = offsetEntityVertexMap.entrySet().iterator();
+            final Set<String>                  typeAndSubTypes = context.getClassificationTypeNames();
 
             while (it.hasNext()) {
-                AtlasVertex  entityVertex        = it.next();
+                AtlasVertex  entityVertex        = it.next().getValue();
                 List<String> classificationNames = AtlasGraphUtilsV2.getClassificationNames(entityVertex);
 
                 if (CollectionUtils.isNotEmpty(classificationNames)) {
@@ -387,6 +407,7 @@ public abstract class SearchProcessor {
                 it.remove();
             }
         }
+        return offsetEntityVertexMap;
     }
 
     protected void constructFilterQuery(StringBuilder indexQuery, Set<? extends AtlasStructType> structTypes, FilterCriteria filterCriteria, Set<String> indexAttributes) {
@@ -1205,6 +1226,18 @@ public abstract class SearchProcessor {
         return vertices;
     }
 
+    protected LinkedHashMap<Integer, AtlasVertex> getVerticesFromIndexQueryResult(Iterator<AtlasIndexQuery.Result> idxQueryResult, LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap, int qryOffset) {
+        if (idxQueryResult != null) {
+            while (idxQueryResult.hasNext()) {
+                AtlasVertex vertex = idxQueryResult.next().getVertex();
+
+                offsetEntityVertexMap.put(qryOffset++, vertex);
+            }
+        }
+
+        return offsetEntityVertexMap;
+    }
+
     protected Collection<AtlasVertex> getVertices(Iterator<AtlasVertex> iterator, Collection<AtlasVertex> vertices) {
         if (iterator != null) {
             while (iterator.hasNext()) {
@@ -1217,6 +1250,18 @@ public abstract class SearchProcessor {
         return vertices;
     }
 
+    protected LinkedHashMap<Integer, AtlasVertex> getVertices(Iterator<AtlasVertex> iterator, LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap, int qryOffset) {
+        if (iterator != null) {
+            while (iterator.hasNext()) {
+                AtlasVertex vertex = iterator.next();
+
+                offsetEntityVertexMap.put(qryOffset++, vertex);
+            }
+        }
+
+        return offsetEntityVertexMap;
+    }
+
     protected Set<String> getGuids(List<AtlasVertex> vertices) {
         Set<String> ret = new HashSet<>();
 
diff --git a/repository/src/main/java/org/apache/atlas/discovery/TermSearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/TermSearchProcessor.java
index 45a8158..b8a507e 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/TermSearchProcessor.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/TermSearchProcessor.java
@@ -20,12 +20,15 @@ package org.apache.atlas.discovery;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.utils.AtlasPerfTracer;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
-
+import java.util.Map;
+import java.util.stream.Collectors;
 
 
 public class TermSearchProcessor extends SearchProcessor {
@@ -58,15 +61,22 @@ public class TermSearchProcessor extends SearchProcessor {
             perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "TermSearchProcessor.execute(" + context +  ")");
         }
 
+        //marker functionality will not work when there is need to fetch Term vertices and get entities from it
         try {
             if (CollectionUtils.isNotEmpty(assignedEntities)) {
+                LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap = new LinkedHashMap<>();
+
                 final int               startIdx = context.getSearchParameters().getOffset();
                 final int               limit    = context.getSearchParameters().getLimit();
                 final List<AtlasVertex> tmpList  = new ArrayList<>(assignedEntities);
 
-                super.filter(tmpList);
+                for (int i = 0; i < tmpList.size(); i++) {
+                    offsetEntityVertexMap.put(i, tmpList.get(i));
+                }
+
+                offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
 
-                collectResultVertices(ret, startIdx, limit, 0, tmpList);
+                collectResultVertices(ret, startIdx, limit, 0, offsetEntityVertexMap, null);
             }
         } finally {
             AtlasPerfTracer.log(perf);
@@ -79,37 +89,41 @@ public class TermSearchProcessor extends SearchProcessor {
         return ret;
     }
 
+    //this filter is never used
     @Override
-    public void filter(List<AtlasVertex> entityVertices) {
+    public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("==> TermSearchProcessor.filter({})", entityVertices.size());
+            LOG.debug("==> TermSearchProcessor.filter({})", offsetEntityVertexMap.size());
         }
 
-        if (CollectionUtils.isNotEmpty(entityVertices)) {
+        if (MapUtils.isNotEmpty(offsetEntityVertexMap)) {
             if (CollectionUtils.isEmpty(assignedEntities)) {
-                entityVertices.clear();
+                offsetEntityVertexMap.clear();
             } else {
-                CollectionUtils.filter(entityVertices, o -> {
-                    if (o instanceof AtlasVertex) {
-                        AtlasVertex entityVertex = (AtlasVertex) o;
-
-                        for (AtlasVertex assignedEntity : assignedEntities) {
-                            if (assignedEntity.getId().equals(entityVertex.getId())) {
-                                return true;
+                offsetEntityVertexMap.entrySet().stream().
+                        filter(o -> {
+                            if (o instanceof AtlasVertex) {
+                                AtlasVertex entityVertex = (AtlasVertex) o;
+
+                                for (AtlasVertex assignedEntity : assignedEntities) {
+                                    if (assignedEntity.getId().equals(entityVertex.getId())) {
+                                        return true;
+                                    }
+                                }
                             }
-                        }
-                    }
 
                     return false;
-                });
+                }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (x, y) -> y, LinkedHashMap::new));
             }
         }
 
-        super.filter(entityVertices);
+        offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("<== TermSearchProcessor.filter(): ret.size()={}", entityVertices.size());
+            LOG.debug("<== TermSearchProcessor.filter(): ret.size()={}", offsetEntityVertexMap.size());
         }
+
+        return offsetEntityVertexMap;
     }
 
     @Override
diff --git a/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java
index 027827a..a9fbd43 100644
--- a/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java
@@ -34,6 +34,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
 import org.testng.Assert;
 import org.testng.annotations.*;
 
@@ -73,67 +74,51 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
         SearchParameters params = new SearchParameters();
         params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY);
 
-        List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
-        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
-        assertEquals(entityHeaders.size(), 10);
+        assertSearchProcessorWithoutMarker(params, 10);
     }
 
     // TSP execute and CSP,ESP filter
     @Test
-    public void term_tag() throws AtlasBaseException {
+    public void termTag() throws AtlasBaseException {
         SearchParameters params = new SearchParameters();
         params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY);
         params.setClassification(METRIC_CLASSIFICATION);
 
-        List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
-        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
-        for(AtlasEntityHeader e : entityHeaders){
-            System.out.println(e.toString());
-        }
-        assertEquals(entityHeaders.size(), 4);
+        assertSearchProcessorWithoutMarker(params, 4);
     }
 
     @Test
-    public void term_entity() throws AtlasBaseException {
+    public void termEntity() throws AtlasBaseException {
         SearchParameters params = new SearchParameters();
         params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY);
         params.setTypeName(HIVE_TABLE_TYPE);
 
-        List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
-        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
-        assertEquals(entityHeaders.size(), 10);
+        assertSearchProcessorWithoutMarker(params, 10);
     }
 
     @Test
-    public void term_entity_tag() throws AtlasBaseException {
+    public void termEntityTag() throws AtlasBaseException {
         SearchParameters params = new SearchParameters();
         params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY);
         params.setTypeName(HIVE_TABLE_TYPE);
         params.setClassification(DIMENSIONAL_CLASSIFICATION);
 
         List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
         Assert.assertTrue(CollectionUtils.isEmpty(entityHeaders));
     }
 
     //FSP execute and CSP,ESP filter
     @Test
-    public void query_ALLTag() throws AtlasBaseException {
+    public void queryALLTag() throws AtlasBaseException {
         SearchParameters params = new SearchParameters();
         params.setClassification(ALL_CLASSIFICATION_TYPES);
         params.setQuery("sales");
 
-        List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
-        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
-        assertEquals(entityHeaders.size(), 5);
+        assertSearchProcessorWithoutMarker(params, 5);
     }
 
     @Test
-    public void query_ALLTag_tagFilter() throws AtlasBaseException {
+    public void queryALLTagTagFilter() throws AtlasBaseException {
         SearchParameters params = new SearchParameters();
         params.setClassification(ALL_CLASSIFICATION_TYPES);
         //typeName will check for only classification name not propogated classification
@@ -141,103 +126,79 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
         params.setTagFilters(fc);
         params.setQuery("sales");
 
-        List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
-        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
-        assertEquals(entityHeaders.size(), 4);
+        assertSearchProcessorWithoutMarker(params, 4);
     }
 
     @Test
-    public void query_NOTCLASSIFIEDTag() throws AtlasBaseException {
+    public void queryNOTCLASSIFIEDTag() throws AtlasBaseException {
         SearchParameters params = new SearchParameters();
         params.setClassification(NO_CLASSIFICATIONS);
         params.setQuery("sales");
 
-        List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
-        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
-        assertEquals(entityHeaders.size(), 11);
+        assertSearchProcessorWithoutMarker(params, 11);
     }
 
 
     @Test
-    public void query_ALLWildcardTag() throws AtlasBaseException {
+    public void queryALLWildcardTag() throws AtlasBaseException {
         SearchParameters params = new SearchParameters();
         params.setClassification("*");
         params.setQuery("sales");
 
-        List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
-        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
-        assertEquals(entityHeaders.size(), 5);
+        assertSearchProcessorWithoutMarker(params, 5);
     }
 
     @Test
-    public void query_wildcardTag() throws AtlasBaseException {
+    public void queryWildcardTag() throws AtlasBaseException {
         SearchParameters params = new SearchParameters();
         params.setClassification("Dimen*on");
         params.setQuery("sales");
 
-        List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
-        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
-        assertEquals(entityHeaders.size(), 2);
+        assertSearchProcessorWithoutMarker(params, 2);
     }
 
     @Test
-    public void query_tag() throws AtlasBaseException {
+    public void queryTag() throws AtlasBaseException {
         SearchParameters params = new SearchParameters();
         params.setClassification(METRIC_CLASSIFICATION);
         params.setQuery("sales");
 
-        List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
-        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
-        assertEquals(entityHeaders.size(), 3);
+        assertSearchProcessorWithoutMarker(params, 3);
     }
 
     @Test
-    public void query_tag_tagFilter() throws AtlasBaseException {
+    public void queryTagTagFilter() throws AtlasBaseException {
         SearchParameters params = new SearchParameters();
         params.setClassification(METRIC_CLASSIFICATION);
         SearchParameters.FilterCriteria fc = getSingleFilterCondition("__timestamp", SearchParameters.Operator.LT, String.valueOf(System.currentTimeMillis()));
         params.setTagFilters(fc);
         params.setQuery("sales");
 
-        List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
-        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
-        assertEquals(entityHeaders.size(), 3);
+        assertSearchProcessorWithoutMarker(params, 3);
     }
 
     @Test
-    public void query_entity() throws AtlasBaseException {
+    public void queryEntity() throws AtlasBaseException {
         SearchParameters params = new SearchParameters();
         params.setTypeName(HIVE_TABLE_TYPE);
         params.setQuery("sales");
 
-        List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
-        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
-        assertEquals(entityHeaders.size(), 4);
+        assertSearchProcessorWithoutMarker(params, 4);
     }
 
     @Test
-    public void query_entity_entityFilter() throws AtlasBaseException {
+    public void queryEntityEntityFilter() throws AtlasBaseException {
         SearchParameters params = new SearchParameters();
         params.setTypeName(HIVE_TABLE_TYPE);
         SearchParameters.FilterCriteria fc = getSingleFilterCondition("tableType", Operator.NOT_NULL, "null");
         params.setEntityFilters(fc);
         params.setQuery("sales");
 
-        List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
-        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
-        assertEquals(entityHeaders.size(), 3);
+        assertSearchProcessorWithoutMarker(params, 3);
     }
 
     @Test
-    public void query_entity_entityFilter_tag() throws AtlasBaseException {
+    public void queryEntityEntityFilterTag() throws AtlasBaseException {
         SearchParameters params = new SearchParameters();
         params.setTypeName(HIVE_TABLE_TYPE);
         SearchParameters.FilterCriteria fc = getSingleFilterCondition("tableType", Operator.IS_NULL, "null");
@@ -245,14 +206,11 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
         params.setClassification(DIMENSIONAL_CLASSIFICATION);
         params.setQuery("sales");
 
-        List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
-        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
-        assertEquals(entityHeaders.size(), 1);
+        assertSearchProcessorWithoutMarker(params, 1);
     }
 
     @Test
-    public void query_entity_entityFilter_tag_tagFilter() throws AtlasBaseException {
+    public void queryEntityEntityFilterTagTagFilter() throws AtlasBaseException {
         SearchParameters params = new SearchParameters();
         params.setTypeName(HIVE_TABLE_TYPE);
         SearchParameters.FilterCriteria fcE = getSingleFilterCondition("tableType", Operator.IS_NULL, "null");
@@ -262,14 +220,11 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
         SearchParameters.FilterCriteria fcC = getSingleFilterCondition("attr1", Operator.EQ, "value1");
         params.setTagFilters(fcC);
 
-        List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
-        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
-        assertEquals(entityHeaders.size(), 1);
+        assertSearchProcessorWithoutMarker(params, 1);
     }
 
     @Test
-    public void query_entity_tag_tagFilter() throws AtlasBaseException {
+    public void queryEntityTagTagFilter() throws AtlasBaseException {
         SearchParameters params = new SearchParameters();
         params.setTypeName(HIVE_TABLE_TYPE);
         params.setClassification(METRIC_CLASSIFICATION);
@@ -277,29 +232,22 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
         params.setTagFilters(fc);
         params.setQuery("sales");
 
-        List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
-        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
-        assertEquals(entityHeaders.size(), 2);
-
+        assertSearchProcessorWithoutMarker(params, 2);
     }
 
     @Test
-    public void query_entity_tag() throws AtlasBaseException {
+    public void queryEntityTag() throws AtlasBaseException {
         SearchParameters params = new SearchParameters();
         params.setTypeName(HIVE_TABLE_TYPE);
         params.setClassification(METRIC_CLASSIFICATION);
         params.setQuery("sales");
 
-        List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
-        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
-        assertEquals(entityHeaders.size(), 2);
+        assertSearchProcessorWithoutMarker(params, 2);
     }
 
     // CSP Execute and ESP filter
     @Test
-    public void entity_entityFilter_tag_tagFilter() throws AtlasBaseException {
+    public void entityEntityFilterTagTagFilter() throws AtlasBaseException {
         SearchParameters params = new SearchParameters();
         params.setTypeName(HIVE_TABLE_TYPE);
         SearchParameters.FilterCriteria fcE = getSingleFilterCondition("tableType", Operator.EQ, "Managed");
@@ -308,27 +256,171 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
         SearchParameters.FilterCriteria fcC = getSingleFilterCondition("__timestamp", SearchParameters.Operator.LT, String.valueOf(System.currentTimeMillis()));
         params.setTagFilters(fcC);
 
-        List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
-
-        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
-        assertEquals(entityHeaders.size(), 4);
-
+        assertSearchProcessorWithoutMarker(params, 4);
     }
 
     @Test
-    public void entity_tag_tagFilter() throws AtlasBaseException {
+    public void entityTagTagFilter() throws AtlasBaseException {
         SearchParameters params = new SearchParameters();
         params.setTypeName(HIVE_TABLE_TYPE);
         params.setClassification(METRIC_CLASSIFICATION);
         SearchParameters.FilterCriteria fc = getSingleFilterCondition("__timestamp", SearchParameters.Operator.LT, String.valueOf(System.currentTimeMillis()));
         params.setTagFilters(fc);
 
+        assertSearchProcessorWithoutMarker(params, 4);
+    }
+
+    @Test
+    public void searchWith0offsetMarker() throws AtlasBaseException {
+        SearchParameters params = new SearchParameters();
+        params.setTypeName(HIVE_TABLE_TYPE);
+        params.setOffset(0);
+        params.setMarker(SearchContext.MarkerUtil.MARKER_START);
+        params.setLimit(5);
+
+        assertSearchProcessorWithMarker(params, 5);
+    }
+
+    @Test
+    public void searchWithNoOffsetMarker() throws AtlasBaseException {
+        SearchParameters params = new SearchParameters();
+        params.setTypeName(HIVE_TABLE_TYPE);
+        params.setMarker(SearchContext.MarkerUtil.MARKER_START);
+        params.setLimit(5);
+
+        assertSearchProcessorWithMarker(params, 5);
+    }
+
+    @Test
+    public void searchWithGreaterThan0OffsetBlankMarker() throws AtlasBaseException {
+        SearchParameters params = new SearchParameters();
+        params.setTypeName(HIVE_TABLE_TYPE);
+        params.setOffset(1);
+        params.setMarker("");
+        params.setLimit(5);
+
+        assertSearchProcessorWithoutMarker(params, 5);
+    }
+
+    @Test(expectedExceptions = AtlasBaseException.class, expectedExceptionsMessageRegExp = "Marker can be used only if offset=0.")
+    public void searchWithGreaterThan0OffsetMarker() throws AtlasBaseException {
+        SearchParameters params = new SearchParameters();
+        params.setTypeName(HIVE_TABLE_TYPE);
+        params.setOffset(1);
+        params.setMarker(SearchContext.MarkerUtil.MARKER_START);
+        params.setLimit(5);
         List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
 
+        assertNotNull(entityHeaders);
+    }
+
+    @Test
+    public void searchWithMarkerSet() throws AtlasBaseException {
+        SearchParameters params = new SearchParameters();
+        params.setTypeName(HIVE_TABLE_TYPE);
+        params.setMarker(SearchContext.MarkerUtil.MARKER_START);
+        params.setLimit(5);
+        AtlasSearchResult searchResult        = discoveryService.searchWithParameters(params);
+        List<AtlasEntityHeader> entityHeaders = searchResult.getEntities();
+
+        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
+        assertEquals(entityHeaders.size(), 5);
+        Assert.assertTrue(StringUtils.isNotEmpty(searchResult.getNextMarker()));
+
+        //get next marker and set in marker of subsequent request
+        params.setMarker(searchResult.getNextMarker());
+        AtlasSearchResult nextsearchResult        = discoveryService.searchWithParameters(params);
+        List<AtlasEntityHeader> nextentityHeaders = nextsearchResult.getEntities();
+
+        Assert.assertTrue(CollectionUtils.isNotEmpty(nextentityHeaders));
+        Assert.assertTrue(StringUtils.isNotEmpty(nextsearchResult.getNextMarker()));
+
+        if (entityHeaders.size() < params.getLimit()) {
+            Assert.assertTrue(nextsearchResult.getNextMarker() == String.valueOf(-1));
+        }
+    }
+
+    @Test(expectedExceptions = AtlasBaseException.class, expectedExceptionsMessageRegExp = "Invalid marker!")
+    public void searchWithInvalidMarker() throws AtlasBaseException {
+        SearchParameters params = new SearchParameters();
+        params.setTypeName(HIVE_TABLE_TYPE);
+        params.setMarker(SearchContext.MarkerUtil.MARKER_START);
+        params.setLimit(5);
+        AtlasSearchResult searchResult        = discoveryService.searchWithParameters(params);
+        List<AtlasEntityHeader> entityHeaders = searchResult.getEntities();
+
+        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
+        assertEquals(entityHeaders.size(), 5);
+        Assert.assertTrue(StringUtils.isNotEmpty(searchResult.getNextMarker()));
+
+        //get next marker and set in marker of subsequent request
+        params.setMarker(searchResult.getNextMarker()+"abc");
+        AtlasSearchResult nextsearchResult = discoveryService.searchWithParameters(params);
+
+    }
+
+    @Test
+    public void searchWithLastPageMarker() throws AtlasBaseException {
+        SearchParameters params = new SearchParameters();
+        params.setTypeName(HIVE_TABLE_TYPE);
+        params.setExcludeDeletedEntities(true);
+        params.setMarker(SearchContext.MarkerUtil.MARKER_START);
+        params.setLimit(5);
+        AtlasSearchResult searchResult        = discoveryService.searchWithParameters(params);
+        List<AtlasEntityHeader> entityHeaders = searchResult.getEntities();
+
         Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
-        assertEquals(entityHeaders.size(), 4);
+        assertEquals(entityHeaders.size(), 5);
+        Assert.assertTrue(StringUtils.isNotEmpty(searchResult.getNextMarker()));
+
+        long maxEntities = searchResult.getApproximateCount();
+
+        //get next marker and set in marker of subsequent request
+        params.setMarker(SearchContext.MarkerUtil.MARKER_START);
+        params.setLimit((int)maxEntities + 10);
+        AtlasSearchResult nextsearchResult = discoveryService.searchWithParameters(params);
+
+        Assert.assertTrue(nextsearchResult.getNextMarker().equals("-1"));
     }
 
+
+    @Test //marker functionality is not supported
+    public void termMarker() throws AtlasBaseException {
+        SearchParameters params = new SearchParameters();
+        params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY);
+        params.setMarker("*");
+
+        assertSearchProcessorWithoutMarker(params, 10);
+
+    }
+
+    @Test
+    public void queryEntityTagMarker() throws AtlasBaseException {
+        SearchParameters params = new SearchParameters();
+        params.setTypeName(HIVE_TABLE_TYPE);
+        params.setClassification(METRIC_CLASSIFICATION);
+        params.setQuery("sales");
+        params.setMarker("*");
+        params.setLimit(5);
+
+        assertSearchProcessorWithMarker(params, 2);
+    }
+
+    // CSP Execute and ESP filter
+    @Test
+    public void entityEntityFilterTagTagFilterMarker() throws AtlasBaseException {
+        SearchParameters params = new SearchParameters();
+        params.setTypeName(HIVE_TABLE_TYPE);
+        SearchParameters.FilterCriteria fcE = getSingleFilterCondition("tableType", Operator.EQ, "Managed");
+        params.setEntityFilters(fcE);
+        params.setClassification(METRIC_CLASSIFICATION);
+        SearchParameters.FilterCriteria fcC = getSingleFilterCondition("__timestamp", SearchParameters.Operator.LT, String.valueOf(System.currentTimeMillis()));
+        params.setTagFilters(fcC);
+        params.setMarker("*");
+        assertSearchProcessorWithoutMarker(params, 4);
+    }
+
+
     String spChar1   = "default.test_dot_name";
     String spChar2   = "default.test_dot_name@db.test_db";
     String spChar3   = "default.test_dot_name_12.col1@db1";
@@ -794,6 +886,29 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
         entityStore.addClassification(Arrays.asList(guid), new AtlasClassification(DIMENSIONAL_CLASSIFICATION, attr));
     }
 
+
+    private void assertSearchProcessorWithoutMarker(SearchParameters params, int expected) throws AtlasBaseException {
+        assertSearchProcessor(params, expected, false);
+    }
+
+    private void assertSearchProcessorWithMarker(SearchParameters params, int expected) throws AtlasBaseException {
+        assertSearchProcessor(params, expected, true);
+    }
+
+    private void assertSearchProcessor(SearchParameters params, int expected, boolean checkMarker) throws AtlasBaseException {
+        AtlasSearchResult searchResult = discoveryService.searchWithParameters(params);
+        List<AtlasEntityHeader> entityHeaders = searchResult.getEntities();
+
+        Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
+        assertEquals(entityHeaders.size(), expected);
+
+        if (checkMarker) {
+            Assert.assertTrue(StringUtils.isNotEmpty(searchResult.getNextMarker()));
+        } else {
+            Assert.assertTrue(StringUtils.isEmpty(searchResult.getNextMarker()));
+        }
+    }
+
     @AfterClass
     public void teardown() throws Exception {
         AtlasGraphProvider.cleanup();
diff --git a/repository/src/test/java/org/apache/atlas/discovery/ClassificationSearchProcessorTest.java b/repository/src/test/java/org/apache/atlas/discovery/ClassificationSearchProcessorTest.java
index e1ebbfc..121dca9 100644
--- a/repository/src/test/java/org/apache/atlas/discovery/ClassificationSearchProcessorTest.java
+++ b/repository/src/test/java/org/apache/atlas/discovery/ClassificationSearchProcessorTest.java
@@ -33,6 +33,7 @@ import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
 import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -263,6 +264,62 @@ public class ClassificationSearchProcessorTest extends BasicTestSetup {
 
     }
 
+    @Test
+    public void searchByWildcardTagMarker() throws AtlasBaseException {
+        final String LAST_MARKER = "-1";
+        SearchParameters params = new SearchParameters();
+        params.setClassification("*");
+        int limit     = 5;
+        String marker = "*";
+        params.setLimit(limit);
+
+        while (!StringUtils.equals(marker, LAST_MARKER)) {
+            params.setMarker(marker);
+            SearchContext context = new SearchContext(params, typeRegistry, graph, indexer.getVertexIndexKeys());
+            ClassificationSearchProcessor processor = new ClassificationSearchProcessor(context);
+            List<AtlasVertex> vertices = processor.execute();
+            long totalCount = vertices.size();
+            marker = processor.getNextMarker();
+
+            if (totalCount < limit) {
+                assertEquals(marker, LAST_MARKER);
+                break;
+            } else {
+                Assert.assertNotNull(marker);
+                assertEquals(vertices.size(), 5);
+            }
+        }
+    }
+
+    @Test   //marker functionality is not supported in this case
+    public void searchByTagAndGraphSysFiltersMarker() throws AtlasBaseException {
+        SearchParameters params = new SearchParameters();
+        params.setClassification(DIMENSION_CLASSIFICATION);
+        FilterCriteria filterCriteria = getSingleFilterCondition("__entityStatus", Operator.EQ, "DELETED");
+        params.setTagFilters(filterCriteria);
+        params.setExcludeDeletedEntities(false);
+        params.setLimit(20);
+        params.setMarker("*");
+
+        SearchContext context = new SearchContext(params, typeRegistry, graph, indexer.getVertexIndexKeys());
+        ClassificationSearchProcessor processor = new ClassificationSearchProcessor(context);
+        List<AtlasVertex> vertices = processor.execute();
+
+        Assert.assertTrue(CollectionUtils.isNotEmpty(vertices));
+        assertEquals(vertices.size(), 1);
+        List<String> guids = vertices.stream().map(g -> {
+            try {
+                return entityRetriever.toAtlasEntityHeader(g).getGuid();
+            } catch (AtlasBaseException e) {
+                fail("Failure in mapping vertex to AtlasEntityHeader");
+            }
+            return "";
+        }).collect(Collectors.toList());
+        Assert.assertTrue(guids.contains(dimensionTagDeleteGuid));
+
+        Assert.assertNull(processor.getNextMarker());
+    }
+
     private void createDimensionTaggedEntityAndDelete() throws AtlasBaseException {
         AtlasEntity entityToDelete = new AtlasEntity(HIVE_TABLE_TYPE);
         entityToDelete.setAttribute("name", "entity to be deleted");
diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java
index 4c7b622..e4c74a9 100644
--- a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java
@@ -197,7 +197,8 @@ public class DiscoveryREST {
                                               @QueryParam("sortOrder")              SortOrder sortOrder,
                                               @QueryParam("excludeDeletedEntities") boolean excludeDeletedEntities,
                                               @QueryParam("limit")                  int     limit,
-                                              @QueryParam("offset")                 int     offset) throws AtlasBaseException {
+                                              @QueryParam("offset")                 int     offset,
+                                              @QueryParam("marker")                 String  marker) throws AtlasBaseException {
         Servlets.validateQueryParamLength("typeName", typeName);
         Servlets.validateQueryParamLength("classification", classification);
         Servlets.validateQueryParamLength("sortBy", sortByAttribute);
@@ -220,6 +221,7 @@ public class DiscoveryREST {
             searchParameters.setExcludeDeletedEntities(excludeDeletedEntities);
             searchParameters.setLimit(limit);
             searchParameters.setOffset(offset);
+            searchParameters.setMarker(marker);
             searchParameters.setSortBy(sortByAttribute);
             searchParameters.setSortOrder(sortOrder);