You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/07/14 23:02:46 UTC

[2/2] incubator-atlas git commit: ATLAS-1947: AtlasSearchResult to include referredEntity headers

ATLAS-1947: AtlasSearchResult to include referredEntity headers


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/bcec42e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/bcec42e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/bcec42e3

Branch: refs/heads/master
Commit: bcec42e3306c9517c1ded5e7ed538c76cfd29c33
Parents: 0d8f9f8
Author: apoorvnaik <ap...@apache.org>
Authored: Thu Jul 13 09:03:06 2017 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Fri Jul 14 16:02:38 2017 -0700

----------------------------------------------------------------------
 .../model/discovery/AtlasSearchResult.java      |  78 ++-
 .../atlas/model/discovery/SearchParameters.java |  34 +-
 .../atlas/discovery/AtlasDiscoveryService.java  |   2 +-
 .../ClassificationSearchProcessor.java          | 198 ++++++
 .../atlas/discovery/EntityDiscoveryService.java | 138 +++--
 .../atlas/discovery/EntitySearchProcessor.java  | 203 ++++++
 .../discovery/FullTextSearchProcessor.java      | 110 ++++
 .../org/apache/atlas/discovery/GremlinStep.java | 389 ------------
 .../apache/atlas/discovery/SearchContext.java   | 126 ++++
 .../apache/atlas/discovery/SearchPipeline.java  | 611 -------------------
 .../apache/atlas/discovery/SearchProcessor.java | 381 ++++++++++++
 .../org/apache/atlas/discovery/SolrStep.java    | 288 ---------
 .../store/graph/v1/EntityGraphRetriever.java    |  20 +-
 .../org/apache/atlas/util/SearchTracker.java    |  16 +-
 .../test/java/org/apache/atlas/TestModules.java |   3 -
 .../atlas/web/resources/AdminResource.java      |   4 +-
 .../apache/atlas/web/rest/DiscoveryREST.java    |   2 +-
 17 files changed, 1216 insertions(+), 1387 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcec42e3/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
index 9513dcb..5827440 100644
--- a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
+++ b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
@@ -31,6 +31,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
@@ -40,14 +41,15 @@ import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONL
 @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class AtlasSearchResult implements Serializable {
-    private AtlasQueryType            queryType;
-    private SearchParameters          searchParameters;
-    private String                    queryText;
-    private String                    type;
-    private String                    classification;
-    private List<AtlasEntityHeader>   entities;
-    private AttributeSearchResult     attributes;
-    private List<AtlasFullTextResult> fullTextResult;
+    private AtlasQueryType                 queryType;
+    private SearchParameters               searchParameters;
+    private String                         queryText;
+    private String                         type;
+    private String                         classification;
+    private List<AtlasEntityHeader>        entities;
+    private AttributeSearchResult          attributes;
+    private List<AtlasFullTextResult>      fullTextResult;
+    private Map<String, AtlasEntityHeader> referredEntities;
 
     public AtlasSearchResult() {}
 
@@ -62,6 +64,7 @@ public class AtlasSearchResult implements Serializable {
         setEntities(null);
         setAttributes(null);
         setFullTextResult(null);
+        setReferredEntities(null);
     }
 
     public AtlasSearchResult(SearchParameters searchParameters) {
@@ -73,6 +76,7 @@ public class AtlasSearchResult implements Serializable {
             setEntities(null);
             setAttributes(null);
             setFullTextResult(null);
+            setReferredEntities(null);
         }
     }
 
@@ -80,6 +84,14 @@ public class AtlasSearchResult implements Serializable {
 
     public void setQueryType(AtlasQueryType queryType) { this.queryType = queryType; }
 
+    public SearchParameters getSearchParameters() {
+        return searchParameters;
+    }
+
+    public void setSearchParameters(SearchParameters searchParameters) {
+        this.searchParameters = searchParameters;
+    }
+
     public String getQueryText() { return queryText; }
 
     public void setQueryText(String queryText) { this.queryText = queryText; }
@@ -104,6 +116,17 @@ public class AtlasSearchResult implements Serializable {
 
     public void setFullTextResult(List<AtlasFullTextResult> fullTextResult) { this.fullTextResult = fullTextResult; }
 
+    public Map<String, AtlasEntityHeader> getReferredEntities() {
+        return referredEntities;
+    }
+
+    public void setReferredEntities(Map<String, AtlasEntityHeader> referredEntities) {
+        this.referredEntities = referredEntities;
+    }
+
+    @Override
+    public int hashCode() { return Objects.hash(queryType, searchParameters, queryText, type, classification, entities, attributes, fullTextResult, referredEntities); }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
@@ -116,24 +139,8 @@ public class AtlasSearchResult implements Serializable {
                Objects.equals(classification, that.classification) &&
                Objects.equals(entities, that.entities) &&
                Objects.equals(attributes, that.attributes) &&
-               Objects.equals(fullTextResult, that.fullTextResult);
-    }
-
-    @Override
-    public int hashCode() { return Objects.hash(queryType, searchParameters, queryText, type, classification, entities, attributes, fullTextResult); }
-
-    @Override
-    public String toString() {
-        return "AtlasSearchResult{" +
-                "queryType=" + queryType +
-                ", searchParameters='" + searchParameters + '\'' +
-                ", queryText='" + queryText + '\'' +
-                ", type=" + type +
-                ", classification=" + classification +
-                ", entities=" + entities +
-                ", attributes=" + attributes +
-                ", fullTextResult=" + fullTextResult +
-                '}';
+               Objects.equals(fullTextResult, that.fullTextResult) &&
+               Objects.equals(referredEntities, that.referredEntities);
     }
 
     public void addEntity(AtlasEntityHeader newEntity) {
@@ -163,12 +170,19 @@ public class AtlasSearchResult implements Serializable {
         }
     }
 
-    public void setSearchParameters(SearchParameters searchParameters) {
-        this.searchParameters = searchParameters;
-    }
-
-    public SearchParameters getSearchParameters() {
-        return searchParameters;
+    @Override
+    public String toString() {
+        return "AtlasSearchResult{" +
+                "queryType=" + queryType +
+                ", searchParameters='" + searchParameters + '\'' +
+                ", queryText='" + queryText + '\'' +
+                ", type=" + type +
+                ", classification=" + classification +
+                ", entities=" + entities +
+                ", attributes=" + attributes +
+                ", fullTextResult=" + fullTextResult +
+                ", referredEntities=" + referredEntities +
+                '}';
     }
 
     public enum AtlasQueryType { DSL, FULL_TEXT, GREMLIN, BASIC, ATTRIBUTE }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcec42e3/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java b/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java
index 30855dc..972c11e 100644
--- a/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java
+++ b/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java
@@ -207,9 +207,12 @@ public class SearchParameters {
         return Objects.hash(query, typeName, classification, excludeDeletedEntities, limit, offset, entityFilters, tagFilters, attributes);
     }
 
-    @Override
-    public String toString() {
-        final StringBuilder sb = new StringBuilder("SearchParameters{");
+    public StringBuilder toString(StringBuilder sb) {
+        if (sb == null) {
+            sb = new StringBuilder();
+        }
+
+        sb.append('{');
         sb.append("query='").append(query).append('\'');
         sb.append(", typeName='").append(typeName).append('\'');
         sb.append(", classification='").append(classification).append('\'');
@@ -220,7 +223,13 @@ public class SearchParameters {
         sb.append(", tagFilters=").append(tagFilters);
         sb.append(", attributes=").append(attributes);
         sb.append('}');
-        return sb.toString();
+
+        return sb;
+    }
+
+    @Override
+    public String toString() {
+        return toString(new StringBuilder()).toString();
     }
 
 
@@ -297,16 +306,25 @@ public class SearchParameters {
             return Objects.hash(attributeName, operator, attributeValue, condition, criterion);
         }
 
-        @Override
-        public String toString() {
-            final StringBuilder sb = new StringBuilder("FilterCriteria{");
+        public StringBuilder toString(StringBuilder sb) {
+            if (sb == null) {
+                sb = new StringBuilder();
+            }
+
+            sb.append('{');
             sb.append("attributeName='").append(attributeName).append('\'');
             sb.append(", operator=").append(operator);
             sb.append(", attributeValue='").append(attributeValue).append('\'');
             sb.append(", condition=").append(condition);
             sb.append(", criterion=").append(criterion);
             sb.append('}');
-            return sb.toString();
+
+            return sb;
+        }
+
+        @Override
+        public String toString() {
+            return toString(new StringBuilder()).toString();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcec42e3/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
index 030a957..764b548 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
@@ -64,5 +64,5 @@ public interface AtlasDiscoveryService {
      * @return Matching entities
      * @throws AtlasBaseException
      */
-    AtlasSearchResult searchUsingBasicQuery(SearchParameters searchParameters) throws AtlasBaseException;
+    AtlasSearchResult searchWithParameters(SearchParameters searchParameters) throws AtlasBaseException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcec42e3/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java
new file mode 100644
index 0000000..77b2c7c
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.discovery;
+
+import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.*;
+import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.utils.AtlasPerfTracer;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+
+public class ClassificationSearchProcessor extends SearchProcessor {
+    private static final Logger LOG      = LoggerFactory.getLogger(ClassificationSearchProcessor.class);
+    private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("ClassificationSearchProcessor");
+
+    private final AtlasIndexQuery indexQuery;
+    private final AtlasGraphQuery allGraphQuery;
+    private final AtlasGraphQuery filterGraphQuery;
+
+    public ClassificationSearchProcessor(SearchContext context) {
+        super(context);
+
+        AtlasClassificationType classificationType = context.getClassificationType();
+        FilterCriteria          filterCriteria     = context.getSearchParameters().getTagFilters();
+        Set<String>             typeAndSubTypes    = classificationType.getTypeAndAllSubTypes();
+        Set<String>             solrAttributes     = new HashSet<>();
+        Set<String>             gremlinAttributes  = new HashSet<>();
+        Set<String>             allAttributes      = new HashSet<>();
+
+
+        processSearchAttributes(classificationType, filterCriteria, solrAttributes, gremlinAttributes, allAttributes);
+
+        // for classification search, if any attribute can't be handled by Solr - switch to all Gremlin
+        boolean useSolrSearch = typeAndSubTypes.size() <= MAX_CLASSIFICATION_TYPES_IN_INDEX_QUERY && CollectionUtils.isEmpty(gremlinAttributes) && canApplySolrFilter(classificationType, filterCriteria, false);
+
+        if (useSolrSearch) {
+            StringBuilder solrQuery = new StringBuilder();
+
+            constructTypeTestQuery(solrQuery, typeAndSubTypes);
+            constructFilterQuery(solrQuery, classificationType, filterCriteria, solrAttributes);
+
+            String solrQueryString = STRAY_AND_PATTERN.matcher(solrQuery).replaceAll(")");
+
+            solrQueryString = STRAY_OR_PATTERN.matcher(solrQueryString).replaceAll(")");
+            solrQueryString = STRAY_ELIPSIS_PATTERN.matcher(solrQueryString).replaceAll("");
+
+            indexQuery = context.getGraph().indexQuery(Constants.VERTEX_INDEX, solrQueryString);
+        } else {
+            indexQuery = null;
+        }
+
+        AtlasGraphQuery query = context.getGraph().query().in(Constants.TYPE_NAME_PROPERTY_KEY, typeAndSubTypes);
+
+        allGraphQuery = toGremlinFilterQuery(classificationType, filterCriteria, allAttributes, query);
+
+        query = context.getGraph().query().in(Constants.TRAIT_NAMES_PROPERTY_KEY, typeAndSubTypes);
+
+        filterGraphQuery = query; // TODO: filer based on tag attributes
+    }
+
+    @Override
+    public List<AtlasVertex> execute() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> ClassificationSearchProcessor.execute({})", context);
+        }
+
+        List<AtlasVertex> ret = new ArrayList<>();
+
+        AtlasPerfTracer perf = null;
+
+        if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+            perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "ClassificationSearchProcessor.execute(" + context +  ")");
+        }
+
+        try {
+            int         qryOffset      = (nextProcessor == null) ? context.getSearchParameters().getOffset() : 0;
+            int         limit          = context.getSearchParameters().getLimit();
+            int         resultIdx      = qryOffset;
+            Set<String> processedGuids = new HashSet<>();
+
+            while (ret.size() < limit) {
+                if (context.terminateSearch()) {
+                    LOG.warn("query terminated: {}", context.getSearchParameters());
+
+                    break;
+                }
+
+                List<AtlasVertex> classificationVertices;
+
+                if (indexQuery != null) {
+                    Iterator<AtlasIndexQuery.Result> queryResult = indexQuery.vertices(qryOffset, limit);
+
+                    if (!queryResult.hasNext()) { // no more results from solr - end of search
+                        break;
+                    }
+
+                    classificationVertices = getVerticesFromIndexQueryResult(queryResult);
+                } else {
+                    Iterator<AtlasVertex> queryResult = allGraphQuery.vertices(qryOffset, limit).iterator();
+
+                    if (!queryResult.hasNext()) { // no more results - end of search
+                        break;
+                    }
+
+                    classificationVertices = getVertices(queryResult);
+                }
+
+                qryOffset += limit;
+
+                List<AtlasVertex> entityVertices = new ArrayList<>();
+
+                for (AtlasVertex classificationVertex : classificationVertices) {
+                    Iterable<AtlasEdge> edges = classificationVertex.getEdges(AtlasEdgeDirection.IN);
+
+                    for (AtlasEdge edge : edges) {
+                        AtlasVertex entityVertex = edge.getOutVertex();
+                        String      guid         = AtlasGraphUtilsV1.getIdFromVertex(entityVertex);
+
+                        if (!processedGuids.contains(guid)) {
+                            if (!context.getSearchParameters().getExcludeDeletedEntities() || AtlasGraphUtilsV1.getState(entityVertex) == AtlasEntity.Status.ACTIVE) {
+                                entityVertices.add(entityVertex);
+                            }
+
+                            processedGuids.add(guid);
+                        }
+                    }
+                }
+
+                entityVertices = super.filter(entityVertices);
+
+                for (AtlasVertex entityVertex : entityVertices) {
+                    resultIdx++;
+
+                    if (resultIdx < context.getSearchParameters().getOffset()) {
+                        continue;
+                    }
+
+                    ret.add(entityVertex);
+
+                    if (ret.size() == limit) {
+                        break;
+                    }
+                }
+            }
+        } finally {
+            AtlasPerfTracer.log(perf);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== ClassificationSearchProcessor.execute({}): ret.size()={}", context, ret.size());
+        }
+
+        return ret;
+    }
+
+    @Override
+    public List<AtlasVertex> filter(List<AtlasVertex> entityVertices) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> ClassificationSearchProcessor.filter({})", entityVertices.size());
+        }
+
+        AtlasGraphQuery query = context.getGraph().query().in(Constants.GUID_PROPERTY_KEY, getGuids(entityVertices));
+
+        query.addConditionsFrom(filterGraphQuery);
+
+        List<AtlasVertex> ret = getVertices(query.vertices().iterator());
+
+        ret = super.filter(ret);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== ClassificationSearchProcessor.filter({}): ret.size()={}", entityVertices.size(), ret.size());
+        }
+
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcec42e3/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
index 5068fa5..a4538bd 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
@@ -20,6 +20,7 @@ package org.apache.atlas.discovery;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasException;
 import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
 import org.apache.atlas.exception.AtlasBaseException;
@@ -29,8 +30,8 @@ import org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType;
 import org.apache.atlas.model.discovery.AtlasSearchResult.AttributeSearchResult;
 import org.apache.atlas.model.discovery.SearchParameters;
 import org.apache.atlas.model.instance.AtlasEntity.Status;
-import org.apache.atlas.AtlasException;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.query.Expressions.AliasExpression;
 import org.apache.atlas.query.Expressions.Expression;
 import org.apache.atlas.query.Expressions.SelectExpression;
@@ -42,16 +43,16 @@ import org.apache.atlas.query.QueryProcessor;
 import org.apache.atlas.query.SelectExpressionHelper;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.MetadataRepository;
+import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
 import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever;
-import org.apache.atlas.type.AtlasClassificationType;
-import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.*;
+import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
-import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.util.AtlasGremlinQueryProvider;
 import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery;
 import org.apache.commons.collections.CollectionUtils;
@@ -67,13 +68,7 @@ import scala.util.parsing.combinator.Parsers.NoSuccess;
 import javax.inject.Inject;
 import javax.script.ScriptEngine;
 import javax.script.ScriptException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import static org.apache.atlas.AtlasErrorCode.CLASSIFICATION_NOT_FOUND;
 import static org.apache.atlas.AtlasErrorCode.DISCOVERY_QUERY_FAILED;
@@ -88,21 +83,20 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
     private final EntityGraphRetriever            entityRetriever;
     private final AtlasGremlinQueryProvider       gremlinQueryProvider;
     private final AtlasTypeRegistry               typeRegistry;
-    private final SearchPipeline                  searchPipeline;
+    private final GraphBackedSearchIndexer        indexer;
     private final int                             maxResultSetSize;
     private final int                             maxTypesCountInIdxQuery;
     private final int                             maxTagsCountInIdxQuery;
 
     @Inject
     EntityDiscoveryService(MetadataRepository metadataRepository, AtlasTypeRegistry typeRegistry,
-                           AtlasGraph graph, SearchPipeline searchPipeline) throws AtlasException {
+                           AtlasGraph graph, GraphBackedSearchIndexer indexer) throws AtlasException {
         this.graph                    = graph;
         this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository);
         this.entityRetriever          = new EntityGraphRetriever(typeRegistry);
+        this.indexer                  = indexer;
         this.gremlinQueryProvider     = AtlasGremlinQueryProvider.INSTANCE;
         this.typeRegistry             = typeRegistry;
-        this.searchPipeline           = searchPipeline;
-
         this.maxResultSetSize         = ApplicationProperties.get().getInt(Constants.INDEX_SEARCH_MAX_RESULT_SET_SIZE, 150);
         this.maxTypesCountInIdxQuery  = ApplicationProperties.get().getInt(Constants.INDEX_SEARCH_MAX_TYPES_COUNT, 10);
         this.maxTagsCountInIdxQuery   = ApplicationProperties.get().getInt(Constants.INDEX_SEARCH_MAX_TAGS_COUNT, 10);
@@ -404,20 +398,85 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
 
     @Override
     @GraphTransaction
-    public AtlasSearchResult searchUsingBasicQuery(SearchParameters searchParameters) throws AtlasBaseException {
+    public AtlasSearchResult searchWithParameters(SearchParameters searchParameters) throws AtlasBaseException {
         AtlasSearchResult ret = new AtlasSearchResult(searchParameters);
 
-        List<AtlasVertex> resultList = searchPipeline.run(searchParameters);
+        SearchContext context = new SearchContext(searchParameters, typeRegistry, graph, indexer.getVertexIndexKeys());
+
+        List<AtlasVertex> resultList = context.getSearchProcessor().execute();
+
+        // By default any attribute that shows up in the search parameter should be sent back in the response
+        // If additional values are requested then the entityAttributes will be a superset of the all search attributes
+        // and the explicitly requested attribute(s)
+        Set<String> resultAttributes = new HashSet<>();
+        Set<String> entityAttributes = new HashSet<>();
+
+        if (CollectionUtils.isNotEmpty(searchParameters.getAttributes())) {
+            resultAttributes.addAll(searchParameters.getAttributes());
+        }
+
+        for (String resultAttribute : resultAttributes) {
+            AtlasAttribute attribute = context.getEntityType().getAttribute(resultAttribute);
+
+            if (attribute != null) {
+                AtlasType attributeType = attribute.getAttributeType();
+
+                if (attributeType instanceof AtlasArrayType) {
+                    attributeType = ((AtlasArrayType) attributeType).getElementType();
+                }
+
+                if (attributeType instanceof AtlasEntityType || attributeType instanceof AtlasObjectIdType) {
+                    entityAttributes.add(resultAttribute);
+                }
+            }
+        }
 
         for (AtlasVertex atlasVertex : resultList) {
-            AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(atlasVertex, searchParameters.getAttributes());
+            AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(atlasVertex, resultAttributes);
 
             ret.addEntity(entity);
+
+            // populate ret.referredEntities
+            for (String entityAttribute : entityAttributes) {
+                Object attrValue = entity.getAttribute(entityAttribute);
+
+                if (attrValue instanceof AtlasObjectId) {
+                    AtlasObjectId objId = (AtlasObjectId)attrValue;
+
+                    if (ret.getReferredEntities() == null) {
+                        ret.setReferredEntities(new HashMap<String, AtlasEntityHeader>());
+                    }
+
+                    if (!ret.getReferredEntities().containsKey(objId.getGuid())) {
+                        ret.getReferredEntities().put(objId.getGuid(), entityRetriever.toAtlasEntityHeader(objId.getGuid()));
+                    }
+                } else if (attrValue instanceof Collection) {
+                    Collection objIds = (Collection)attrValue;
+
+                    for (Object obj : objIds) {
+                        if (obj instanceof AtlasObjectId) {
+                            AtlasObjectId objId = (AtlasObjectId)obj;
+
+                            if (ret.getReferredEntities() == null) {
+                                ret.setReferredEntities(new HashMap<String, AtlasEntityHeader>());
+                            }
+
+                            if (!ret.getReferredEntities().containsKey(objId.getGuid())) {
+                                ret.getReferredEntities().put(objId.getGuid(), entityRetriever.toAtlasEntityHeader(objId.getGuid()));
+                            }
+                        }
+                    }
+                }
+            }
         }
 
         return ret;
     }
 
+    public int getMaxResultSetSize() {
+        return maxResultSetSize;
+    }
+
     private String getQueryForFullTextSearch(String userKeyedString, String typeName, String classification) {
         String typeFilter          = getTypeFilter(typeRegistry, typeName, maxTypesCountInIdxQuery);
         String classficationFilter = getClassificationFilter(typeRegistry, classification, maxTagsCountInIdxQuery);
@@ -447,28 +506,6 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
         return String.format("v.\"%s\":(%s)", Constants.ENTITY_TEXT_PROPERTY_KEY, queryText.toString());
     }
 
-    private static String getClassificationFilter(AtlasTypeRegistry typeRegistry, String classificationName, int maxTypesCountInIdxQuery) {
-        AtlasClassificationType classification  = typeRegistry.getClassificationTypeByName(classificationName);
-        Set<String>             typeAndSubTypes = classification != null ? classification.getTypeAndAllSubTypes() : null;
-
-        if(CollectionUtils.isNotEmpty(typeAndSubTypes) && typeAndSubTypes.size() <= maxTypesCountInIdxQuery) {
-            return String.format("(%s)", StringUtils.join(typeAndSubTypes, " "));
-        }
-
-        return "";
-    }
-
-    private static String getTypeFilter(AtlasTypeRegistry typeRegistry, String typeName, int maxTypesCountInIdxQuery) {
-        AtlasEntityType type            = typeRegistry.getEntityTypeByName(typeName);
-        Set<String>     typeAndSubTypes = type != null ? type.getTypeAndAllSubTypes() : null;
-
-        if(CollectionUtils.isNotEmpty(typeAndSubTypes) && typeAndSubTypes.size() <= maxTypesCountInIdxQuery) {
-            return String.format("(%s)", StringUtils.join(typeAndSubTypes, " "));
-        }
-
-        return "";
-    }
-
     private List<AtlasFullTextResult> getIndexQueryResults(AtlasIndexQuery query, QueryParams params, boolean excludeDeletedEntities) throws AtlasBaseException {
         List<AtlasFullTextResult> ret  = new ArrayList<>();
         Iterator<Result>          iter = query.vertices();
@@ -570,8 +607,25 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
         return excludeDeletedEntities && GraphHelper.getStatus(vertex) == Status.DELETED;
     }
 
-    public int getMaxResultSetSize() {
-        return maxResultSetSize;
+    private static String getClassificationFilter(AtlasTypeRegistry typeRegistry, String classificationName, int maxTypesCountInIdxQuery) {
+        AtlasClassificationType classification  = typeRegistry.getClassificationTypeByName(classificationName);
+        Set<String>             typeAndSubTypes = classification != null ? classification.getTypeAndAllSubTypes() : null;
+
+        if(CollectionUtils.isNotEmpty(typeAndSubTypes) && typeAndSubTypes.size() <= maxTypesCountInIdxQuery) {
+            return String.format("(%s)", StringUtils.join(typeAndSubTypes, " "));
+        }
+
+        return "";
     }
 
+    private static String getTypeFilter(AtlasTypeRegistry typeRegistry, String typeName, int maxTypesCountInIdxQuery) {
+        AtlasEntityType type            = typeRegistry.getEntityTypeByName(typeName);
+        Set<String>     typeAndSubTypes = type != null ? type.getTypeAndAllSubTypes() : null;
+
+        if(CollectionUtils.isNotEmpty(typeAndSubTypes) && typeAndSubTypes.size() <= maxTypesCountInIdxQuery) {
+            return String.format("(%s)", StringUtils.join(typeAndSubTypes, " "));
+        }
+
+        return "";
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcec42e3/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java
new file mode 100644
index 0000000..605cb15
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.discovery;
+
+import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.*;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.utils.AtlasPerfTracer;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class EntitySearchProcessor extends SearchProcessor {
+    private static final Logger LOG      = LoggerFactory.getLogger(EntitySearchProcessor.class);
+    private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("EntitySearchProcessor");
+
+    private final AtlasIndexQuery indexQuery;
+    private final AtlasGraphQuery partialGraphQuery;
+    private final AtlasGraphQuery allGraphQuery;
+
+    public EntitySearchProcessor(SearchContext context) {
+        super(context);
+
+        AtlasEntityType         entityType         = context.getEntityType();
+        AtlasClassificationType classificationType = context.getClassificationType();
+        FilterCriteria          filterCriteria     = context.getSearchParameters().getEntityFilters();
+        Set<String>             typeAndSubTypes    = entityType.getTypeAndAllSubTypes();
+        Set<String>             solrAttributes     = new HashSet<>();
+        Set<String>             gremlinAttributes  = new HashSet<>();
+        Set<String>             allAttributes      = new HashSet<>();
+
+
+        processSearchAttributes(entityType, filterCriteria, solrAttributes, gremlinAttributes, allAttributes);
+
+        boolean useSolrSearch = typeAndSubTypes.size() <= MAX_ENTITY_TYPES_IN_INDEX_QUERY && canApplySolrFilter(entityType, filterCriteria, false);
+
+        if (useSolrSearch) {
+            StringBuilder solrQuery = new StringBuilder();
+
+            constructTypeTestQuery(solrQuery, typeAndSubTypes);
+            constructFilterQuery(solrQuery, entityType, filterCriteria, solrAttributes);
+
+            String solrQueryString = STRAY_AND_PATTERN.matcher(solrQuery).replaceAll(")");
+
+            solrQueryString = STRAY_OR_PATTERN.matcher(solrQueryString).replaceAll(")");
+            solrQueryString = STRAY_ELIPSIS_PATTERN.matcher(solrQueryString).replaceAll("");
+
+            indexQuery = context.getGraph().indexQuery(Constants.VERTEX_INDEX, solrQueryString);
+
+            if (CollectionUtils.isNotEmpty(gremlinAttributes) || classificationType != null) {
+                AtlasGraphQuery query = context.getGraph().query();
+
+                addClassificationNameConditionIfNecessary(query);
+
+                partialGraphQuery = toGremlinFilterQuery(entityType, filterCriteria, gremlinAttributes, query);
+            } else {
+                partialGraphQuery = null;
+            }
+        } else {
+            indexQuery      = null;
+            partialGraphQuery = null;
+        }
+
+        AtlasGraphQuery query = context.getGraph().query().in(Constants.TYPE_NAME_PROPERTY_KEY, typeAndSubTypes);
+
+        addClassificationNameConditionIfNecessary(query);
+
+        allGraphQuery = toGremlinFilterQuery(entityType, filterCriteria, allAttributes, query);
+
+        if (context.getSearchParameters().getExcludeDeletedEntities()) {
+            allGraphQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE");
+        }
+    }
+
+    @Override
+    public List<AtlasVertex> execute() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> EntitySearchProcessor.execute({})", context);
+        }
+
+        List<AtlasVertex> ret = new ArrayList<>();
+
+        AtlasPerfTracer perf = null;
+
+        if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+            perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntitySearchProcessor.execute(" + context +  ")");
+        }
+
+        try {
+            int qryOffset = (nextProcessor == null) ? context.getSearchParameters().getOffset() : 0;
+            int limit     = context.getSearchParameters().getLimit();
+            int resultIdx = qryOffset;
+
+            while (ret.size() < limit) {
+                if (context.terminateSearch()) {
+                    LOG.warn("query terminated: {}", context.getSearchParameters());
+
+                    break;
+                }
+
+                List<AtlasVertex> vertices;
+
+                if (indexQuery != null) {
+                    Iterator<AtlasIndexQuery.Result> queryResult = indexQuery.vertices(qryOffset, limit);
+
+                    if (!queryResult.hasNext()) { // no more results from solr - end of search
+                        break;
+                    }
+
+                    vertices = getVerticesFromIndexQueryResult(queryResult);
+
+                    if (partialGraphQuery != null) {
+                        AtlasGraphQuery guidQuery = context.getGraph().query().in(Constants.GUID_PROPERTY_KEY, getGuids(vertices));
+
+                        guidQuery.addConditionsFrom(partialGraphQuery);
+
+                        vertices = getVertices(guidQuery.vertices().iterator());
+                    }
+                } else {
+                    Iterator<AtlasVertex> queryResult = allGraphQuery.vertices(qryOffset, limit).iterator();
+
+                    if (!queryResult.hasNext()) { // no more results from query - end of search
+                        break;
+                    }
+
+                    vertices = getVertices(queryResult);
+                }
+
+                qryOffset += limit;
+
+                vertices = super.filter(vertices);
+
+                for (AtlasVertex vertex : vertices) {
+                    resultIdx++;
+
+                    if (resultIdx < context.getSearchParameters().getOffset()) {
+                        continue;
+                    }
+
+                    ret.add(vertex);
+
+                    if (ret.size() == limit) {
+                        break;
+                    }
+                }
+            }
+        } finally {
+            AtlasPerfTracer.log(perf);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== EntitySearchProcessor.execute({}): ret.size()={}", context, ret.size());
+        }
+
+        return ret;
+    }
+
+    @Override
+    public List<AtlasVertex> filter(List<AtlasVertex> entityVertices) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> EntitySearchProcessor.filter({})", entityVertices.size());
+        }
+
+        AtlasGraphQuery query = context.getGraph().query().in(Constants.GUID_PROPERTY_KEY, getGuids(entityVertices));
+
+        query.addConditionsFrom(allGraphQuery);
+
+        List<AtlasVertex> ret = getVertices(query.vertices().iterator());
+
+        ret = super.filter(ret);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== EntitySearchProcessor.filter({}): ret.size()={}", entityVertices.size(), ret.size());
+        }
+
+        return ret;
+    }
+
+    private void addClassificationNameConditionIfNecessary(AtlasGraphQuery query) {
+        if (context.getClassificationType() != null && !context.needClassificationProcessor()) {
+            query.in(Constants.TRAIT_NAMES_PROPERTY_KEY, context.getClassificationType().getTypeAndAllSubTypes());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcec42e3/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java
new file mode 100644
index 0000000..4ddd642
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.discovery;
+
+import org.apache.atlas.model.discovery.SearchParameters;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.utils.AtlasPerfTracer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+
+public class FullTextSearchProcessor extends SearchProcessor {
+    private static final Logger LOG      = LoggerFactory.getLogger(FullTextSearchProcessor.class);
+    private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("FullTextSearchProcessor");
+
+    private final AtlasIndexQuery indexQuery;
+
+    public FullTextSearchProcessor(SearchContext context) {
+        super(context);
+
+        SearchParameters searchParameters = context.getSearchParameters();
+        String           queryString      = String.format("v.\"%s\":(%s)", Constants.ENTITY_TEXT_PROPERTY_KEY, searchParameters.getQuery());
+
+        indexQuery = context.getGraph().indexQuery(Constants.FULLTEXT_INDEX, queryString);
+    }
+
+    @Override
+    public List<AtlasVertex> execute() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> FullTextSearchProcessor.execute({})", context);
+        }
+
+        List<AtlasVertex> ret = new ArrayList<>();
+
+        AtlasPerfTracer perf = null;
+
+        if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+            perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "FullTextSearchProcessor.execute(" + context +  ")");
+        }
+
+        try {
+            int qryOffset = nextProcessor == null ? context.getSearchParameters().getOffset() : 0;
+            int limit     = context.getSearchParameters().getLimit();
+            int resultIdx = qryOffset;
+
+            while (ret.size() < limit) {
+                if (context.terminateSearch()) {
+                    LOG.warn("query terminated: {}", context.getSearchParameters());
+
+                    break;
+                }
+
+                Iterator<AtlasIndexQuery.Result> idxQueryResult = indexQuery.vertices(qryOffset, limit);
+
+                if (!idxQueryResult.hasNext()) { // no more results from solr - end of search
+                    break;
+                }
+
+                qryOffset += limit;
+
+                List<AtlasVertex> vertices = getVerticesFromIndexQueryResult(idxQueryResult);
+
+                vertices = super.filter(vertices);
+
+                for (AtlasVertex vertex : vertices) {
+                    resultIdx++;
+
+                    if (resultIdx < context.getSearchParameters().getOffset()) {
+                        continue;
+                    }
+
+                    ret.add(vertex);
+
+                    if (ret.size() == limit) {
+                        break;
+                    }
+                }
+            }
+        } finally {
+            AtlasPerfTracer.log(perf);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== FullTextSearchProcessor.execute({}): ret.size()={}", context, ret.size());
+        }
+
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcec42e3/repository/src/main/java/org/apache/atlas/discovery/GremlinStep.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/GremlinStep.java b/repository/src/main/java/org/apache/atlas/discovery/GremlinStep.java
deleted file mode 100644
index 1056b3e..0000000
--- a/repository/src/main/java/org/apache/atlas/discovery/GremlinStep.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.discovery;
-
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.discovery.SearchParameters;
-import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
-import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria.Condition;
-import org.apache.atlas.model.discovery.SearchParameters.Operator;
-import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.graphdb.AtlasEdge;
-import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
-import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
-import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
-import org.apache.atlas.type.AtlasClassificationType;
-import org.apache.atlas.type.AtlasEntityType;
-import org.apache.atlas.type.AtlasStructType;
-import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.utils.AtlasPerfTracer;
-import org.apache.commons.collections.CollectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
-import javax.inject.Inject;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-import static org.apache.atlas.discovery.SearchPipeline.IndexResultType;
-import static org.apache.atlas.discovery.SearchPipeline.PipelineContext;
-import static org.apache.atlas.discovery.SearchPipeline.PipelineStep;
-import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator;
-import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.MatchingOperator;
-
-@Component
-public class GremlinStep implements PipelineStep {
-    private static final Logger LOG      = LoggerFactory.getLogger(GremlinStep.class);
-    private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("GremlinSearchStep");
-
-    private final AtlasGraph        graph;
-    private final AtlasTypeRegistry typeRegistry;
-
-    enum GremlinFilterQueryType { TAG, ENTITY }
-
-    @Inject
-    public GremlinStep(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
-        this.graph        = graph;
-        this.typeRegistry = typeRegistry;
-    }
-
-    @Override
-    public void execute(PipelineContext context) throws AtlasBaseException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> GremlinStep.execute({})", context);
-        }
-
-        if (context == null) {
-            throw new AtlasBaseException("Can't start search without any context");
-        }
-
-        AtlasPerfTracer perf = null;
-
-        if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
-            perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "GremlinSearchStep.execute(" + context +  ")");
-        }
-
-        final Iterator<AtlasVertex> result;
-
-        if (context.hasIndexResults()) {
-            // We have some results from the indexed step, let's proceed accordingly
-            if (context.getIndexResultType() == IndexResultType.TAG) {
-                // Index search was done on tag and filters
-                if (context.isTagProcessingComplete()) {
-                    LOG.debug("GremlinStep.execute(): index has completely processed tag, further TAG filtering not needed");
-
-                    Set<String> taggedVertexGUIDs = new HashSet<>();
-
-                    Iterator<AtlasIndexQuery.Result> tagVertexIterator = context.getIndexResultsIterator();
-
-                    while (tagVertexIterator.hasNext()) {
-                        // Find out which Vertex has this outgoing edge
-                        AtlasVertex         vertex = tagVertexIterator.next().getVertex();
-                        Iterable<AtlasEdge> edges  = vertex.getEdges(AtlasEdgeDirection.IN);
-
-                        for (AtlasEdge edge : edges) {
-                            String guid = AtlasGraphUtilsV1.getIdFromVertex(edge.getOutVertex());
-
-                            taggedVertexGUIDs.add(guid);
-                        }
-                    }
-
-                    // No entities are tagged  (actually this check is already done)
-                    if (!taggedVertexGUIDs.isEmpty()) {
-                        result = processEntity(taggedVertexGUIDs, context);
-                    } else {
-                        result = null;
-                    }
-                } else {
-                    result = processTagAndEntity(Collections.<String>emptySet(), context);
-                }
-            } else if (context.getIndexResultType() == IndexResultType.TEXT) {
-                // Index step processed full-text;
-                Set<String> entityIDs = getVertexIDs(context.getIndexResultsIterator());
-
-                result = processTagAndEntity(entityIDs, context);
-            } else if (context.getIndexResultType() == IndexResultType.ENTITY) {
-                // Index step processed entity and it's filters; tag filter wouldn't be set
-                Set<String> entityIDs = getVertexIDs(context.getIndexResultsIterator());
-
-                result = processEntity(entityIDs, context);
-            } else {
-                result = null;
-            }
-        } else {
-            // No index results, need full processing in Gremlin
-            if (context.getClassificationType() != null) {
-                // Process tag and filters first, then entity filters
-                result = processTagAndEntity(Collections.<String>emptySet(), context);
-            } else {
-                result = processEntity(Collections.<String>emptySet(), context);
-            }
-        }
-
-        context.setGremlinResultIterator(result);
-
-        AtlasPerfTracer.log(perf);
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== GremlinStep.execute({})", context);
-        }
-    }
-
-    private Iterator<AtlasVertex> processEntity(Set<String> entityGUIDs, PipelineContext context) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> GremlinStep.processEntity(entityGUIDs={})", entityGUIDs);
-        }
-
-        final Iterator<AtlasVertex> ret;
-
-        SearchParameters searchParameters = context.getSearchParameters();
-        AtlasEntityType  entityType       = context.getEntityType();
-
-        if (entityType != null) {
-            AtlasGraphQuery entityFilterQuery = context.getGraphQuery("ENTITY_FILTER");
-
-            if (entityFilterQuery == null) {
-                entityFilterQuery = graph.query().in(Constants.TYPE_NAME_PROPERTY_KEY, entityType.getTypeAndAllSubTypes());
-
-                if (searchParameters.getEntityFilters() != null) {
-                    toGremlinFilterQuery(GremlinFilterQueryType.ENTITY, entityType, searchParameters.getEntityFilters(), entityFilterQuery, context);
-                }
-
-                if (searchParameters.getExcludeDeletedEntities()) {
-                    entityFilterQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE");
-                }
-
-                context.cacheGraphQuery("ENTITY_FILTER", entityFilterQuery);
-            }
-
-            // Now get all vertices
-            if (CollectionUtils.isEmpty(entityGUIDs)) {
-                ret = entityFilterQuery.vertices(context.getCurrentOffset(), context.getMaxLimit()).iterator();
-            } else {
-                AtlasGraphQuery guidQuery = graph.query().in(Constants.GUID_PROPERTY_KEY, entityGUIDs);
-
-                if (entityFilterQuery != null) {
-                    guidQuery.addConditionsFrom(entityFilterQuery);
-                } else if (searchParameters.getExcludeDeletedEntities()) {
-                    guidQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE");
-                }
-
-                ret = guidQuery.vertices(context.getMaxLimit()).iterator();
-            }
-        } else if (CollectionUtils.isNotEmpty(entityGUIDs)) {
-            AtlasGraphQuery guidQuery = graph.query().in(Constants.GUID_PROPERTY_KEY, entityGUIDs);
-
-            if (searchParameters.getExcludeDeletedEntities()) {
-                guidQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE");
-            }
-
-            Iterable<AtlasVertex> vertices = guidQuery.vertices(context.getMaxLimit());
-
-            ret = vertices.iterator();
-        } else {
-            ret = null;
-        }
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== GremlinStep.processEntity(entityGUIDs={})", entityGUIDs);
-        }
-
-        return ret;
-    }
-
-    private Iterator<AtlasVertex> processTagAndEntity(Set<String> entityGUIDs, PipelineContext context) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> GremlinStep.processTagAndEntity(entityGUIDs={})", entityGUIDs);
-        }
-
-        final Iterator<AtlasVertex> ret;
-
-        AtlasClassificationType classificationType = context.getClassificationType();
-
-        if (classificationType != null) {
-            AtlasGraphQuery  tagVertexQuery = context.getGraphQuery("TAG_VERTEX");
-
-            if (tagVertexQuery == null) {
-                tagVertexQuery = graph.query().in(Constants.TYPE_NAME_PROPERTY_KEY, classificationType.getTypeAndAllSubTypes());
-
-                SearchParameters searchParameters = context.getSearchParameters();
-
-                // Do tag filtering first as it'll return a smaller subset of vertices
-                if (searchParameters.getTagFilters() != null) {
-                    toGremlinFilterQuery(GremlinFilterQueryType.TAG, classificationType, searchParameters.getTagFilters(), tagVertexQuery, context);
-                }
-
-                context.cacheGraphQuery("TAG_VERTEX", tagVertexQuery);
-            }
-
-            if (tagVertexQuery != null) {
-                Set<String> taggedVertexGuids = new HashSet<>();
-                // Now get all vertices after adjusting offset for each iteration
-                LOG.debug("Firing TAG query");
-
-                Iterator<AtlasVertex> tagVertexIterator = tagVertexQuery.vertices(context.getCurrentOffset(), context.getMaxLimit()).iterator();
-
-                while (tagVertexIterator.hasNext()) {
-                    // Find out which Vertex has this outgoing edge
-                    Iterable<AtlasEdge> edges = tagVertexIterator.next().getEdges(AtlasEdgeDirection.IN);
-                    for (AtlasEdge edge : edges) {
-                        String guid = AtlasGraphUtilsV1.getIdFromVertex(edge.getOutVertex());
-                        taggedVertexGuids.add(guid);
-                    }
-                }
-
-                entityGUIDs = taggedVertexGuids;
-            }
-        }
-
-        if (!entityGUIDs.isEmpty()) {
-            ret = processEntity(entityGUIDs, context);
-        } else {
-            ret = null;
-        }
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== GremlinStep.processTagAndEntity(entityGUIDs={})", entityGUIDs);
-        }
-
-        return ret;
-    }
-
-    private Set<String> getVertexIDs(Iterator<AtlasIndexQuery.Result> idxResultsIterator) {
-        Set<String> guids = new HashSet<>();
-        while (idxResultsIterator.hasNext()) {
-            AtlasVertex vertex = idxResultsIterator.next().getVertex();
-            String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex);
-            guids.add(guid);
-        }
-        return guids;
-    }
-
-    private Set<String> getVertexIDs(Iterable<AtlasVertex> vertices) {
-        Set<String> guids = new HashSet<>();
-        for (AtlasVertex vertex : vertices) {
-            String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex);
-            guids.add(guid);
-        }
-        return guids;
-    }
-
-    private AtlasGraphQuery toGremlinFilterQuery(GremlinFilterQueryType queryType, AtlasStructType type, FilterCriteria criteria,
-                                                 AtlasGraphQuery query, PipelineContext context) {
-        if (criteria.getCondition() != null) {
-            if (criteria.getCondition() == Condition.AND) {
-                for (FilterCriteria filterCriteria : criteria.getCriterion()) {
-                    AtlasGraphQuery nestedQuery = toGremlinFilterQuery(queryType, type, filterCriteria, graph.query(), context);
-                    query.addConditionsFrom(nestedQuery);
-                }
-            } else {
-                List<AtlasGraphQuery> orConditions = new LinkedList<>();
-
-                for (FilterCriteria filterCriteria : criteria.getCriterion()) {
-                    AtlasGraphQuery nestedQuery = toGremlinFilterQuery(queryType, type, filterCriteria, graph.query(), context);
-                    // FIXME: Something might not be right here as the queries are getting overwritten sometimes
-                    orConditions.add(graph.query().createChildQuery().addConditionsFrom(nestedQuery));
-                }
-
-                if (!orConditions.isEmpty()) {
-                    query.or(orConditions);
-                }
-            }
-        } else {
-            String   attrName  = criteria.getAttributeName();
-            String   attrValue = criteria.getAttributeValue();
-            Operator operator  = criteria.getOperator();
-
-            try {
-                // If attribute belongs to supertype then adjust the name accordingly
-                final String  qualifiedAttributeName;
-                final boolean attrProcessed;
-
-                if (queryType == GremlinFilterQueryType.TAG) {
-                    qualifiedAttributeName = type.getQualifiedAttributeName(attrName);
-                    attrProcessed          = context.hasProcessedTagAttribute(qualifiedAttributeName);
-                } else {
-                    qualifiedAttributeName = type.getQualifiedAttributeName(attrName);
-                    attrProcessed          = context.hasProcessedEntityAttribute(qualifiedAttributeName);
-                }
-
-                // Check if the qualifiedAttribute has been processed
-                if (!attrProcessed) {
-                    switch (operator) {
-                        case LT:
-                            query.has(qualifiedAttributeName, ComparisionOperator.LESS_THAN, attrValue);
-                            break;
-                        case LTE:
-                            query.has(qualifiedAttributeName, ComparisionOperator.LESS_THAN_EQUAL, attrValue);
-                            break;
-                        case GT:
-                            query.has(qualifiedAttributeName, ComparisionOperator.GREATER_THAN, attrValue);
-                            break;
-                        case GTE:
-                            query.has(qualifiedAttributeName, ComparisionOperator.GREATER_THAN_EQUAL, attrValue);
-                            break;
-                        case EQ:
-                            query.has(qualifiedAttributeName, ComparisionOperator.EQUAL, attrValue);
-                            break;
-                        case NEQ:
-                            query.has(qualifiedAttributeName, ComparisionOperator.NOT_EQUAL, attrValue);
-                            break;
-                        case LIKE:
-                            // TODO: Maybe we need to validate pattern
-                            query.has(qualifiedAttributeName, MatchingOperator.REGEX, getLikeRegex(attrValue));
-                            break;
-                        case CONTAINS:
-                            query.has(qualifiedAttributeName, MatchingOperator.REGEX, getContainsRegex(attrValue));
-                            break;
-                        case STARTS_WITH:
-                            query.has(qualifiedAttributeName, MatchingOperator.PREFIX, attrValue);
-                            break;
-                        case ENDS_WITH:
-                            query.has(qualifiedAttributeName, MatchingOperator.REGEX, getSuffixRegex(attrValue));
-                            break;
-                        case IN:
-                            LOG.warn("{}: unsupported operator. Ignored", operator);
-                            break;
-                    }
-                }
-            } catch (AtlasBaseException e) {
-                LOG.error("toGremlinFilterQuery(): failed for attrName=" + attrName + "; operator=" + operator + "; attrValue=" + attrValue, e);
-            }
-        }
-
-        return query;
-    }
-
-    private String getContainsRegex(String attributeValue) {
-        return ".*" + attributeValue + ".*";
-    }
-
-    private String getSuffixRegex(String attributeValue) {
-        return ".*" + attributeValue;
-    }
-
-    private String getLikeRegex(String attributeValue) { return ".*" + attributeValue + ".*"; }
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcec42e3/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java b/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java
new file mode 100644
index 0000000..2125d61
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.discovery;
+
+
+import org.apache.atlas.model.discovery.SearchParameters;
+import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Set;
+
+
+public class SearchContext {
+    private final SearchParameters        searchParameters;
+    private final AtlasTypeRegistry       typeRegistry;
+    private final AtlasGraph              graph;
+    private final Set<String>             indexedKeys;
+    private final AtlasEntityType         entityType;
+    private final AtlasClassificationType classificationType;
+    private       SearchProcessor         searchProcessor;
+    private       boolean                 terminateSearch = false;
+
+    public SearchContext(SearchParameters searchParameters, AtlasTypeRegistry typeRegistry, AtlasGraph graph, Set<String> indexedKeys) {
+        this.searchParameters   = searchParameters;
+        this.typeRegistry       = typeRegistry;
+        this.graph              = graph;
+        this.indexedKeys        = indexedKeys;
+        this.entityType         = typeRegistry.getEntityTypeByName(searchParameters.getTypeName());
+        this.classificationType = typeRegistry.getClassificationTypeByName(searchParameters.getClassification());
+
+        if (needFullTextrocessor()) {
+            addProcessor(new FullTextSearchProcessor(this));
+        }
+
+        if (needClassificationProcessor()) {
+            addProcessor(new ClassificationSearchProcessor(this));
+        }
+
+        if (needEntityProcessor()) {
+            addProcessor(new EntitySearchProcessor(this));
+
+        }
+    }
+
+    public SearchParameters getSearchParameters() { return searchParameters; }
+
+    public AtlasTypeRegistry getTypeRegistry() { return typeRegistry; }
+
+    public AtlasGraph getGraph() { return graph; }
+
+    public Set<String> getIndexedKeys() { return indexedKeys; }
+
+    public AtlasEntityType getEntityType() { return entityType; }
+
+    public AtlasClassificationType getClassificationType() { return classificationType; }
+
+    public SearchProcessor getSearchProcessor() { return searchProcessor; }
+
+    public boolean terminateSearch() { return this.terminateSearch; }
+
+    public void terminateSearch(boolean terminateSearch) { this.terminateSearch = terminateSearch; }
+
+    public StringBuilder toString(StringBuilder sb) {
+        if (sb == null) {
+            sb = new StringBuilder();
+        }
+
+        sb.append("searchParameters=");
+
+        if (searchParameters != null) {
+            searchParameters.toString(sb);
+        }
+
+        return sb;
+    }
+
+    @Override
+    public String toString() {
+        return toString(new StringBuilder()).toString();
+    }
+
+    public boolean needFullTextrocessor() {
+        return StringUtils.isNotEmpty(searchParameters.getQuery());
+    }
+
+    public boolean needClassificationProcessor() {
+        return classificationType != null && (hasAttributeFilter(searchParameters.getTagFilters()) || entityType == null);
+    }
+
+    public boolean needEntityProcessor() {
+        return entityType != null;
+    }
+
+    private boolean hasAttributeFilter(FilterCriteria filterCriteria) {
+        return filterCriteria != null &&
+               (CollectionUtils.isNotEmpty(filterCriteria.getCriterion()) || StringUtils.isNotEmpty(filterCriteria.getAttributeName()));
+    }
+
+    private void addProcessor(SearchProcessor processor) {
+        if (this.searchProcessor == null) {
+            this.searchProcessor = processor;
+        } else {
+            this.searchProcessor.addProcessor(processor);
+        }
+    }
+}