You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2016/07/20 13:03:07 UTC

[4/5] incubator-atlas git commit: ATLAS-347 Atlas search APIs should allow pagination of results (shwethags)

ATLAS-347 Atlas search APIs should allow pagination of results (shwethags)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/085d5c86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/085d5c86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/085d5c86

Branch: refs/heads/master
Commit: 085d5c8642f6a47f25cf7ed6d6e3526dbbdc625f
Parents: 67acb9d
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Wed Jul 20 18:13:58 2016 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Wed Jul 20 18:13:58 2016 +0530

----------------------------------------------------------------------
 .../atlas/hive/bridge/HiveMetaStoreBridge.java  |  14 +-
 .../hive/bridge/HiveMetaStoreBridgeTest.java    |  25 ++--
 .../apache/atlas/sqoop/hook/SqoopHookIT.java    |   4 +-
 .../atlas/storm/hook/StormAtlasHookIT.java      |   2 +-
 .../main/java/org/apache/atlas/AtlasClient.java |  49 +++----
 .../java/org/apache/atlas/AtlasProperties.java  |  64 +++++++++
 .../org/apache/atlas/utils/ParamChecker.java    |   8 +-
 docs/src/site/twiki/Configuration.twiki         |  12 ++
 docs/src/site/twiki/Search.twiki                |  31 ++--
 release-log.txt                                 |   1 +
 .../atlas/discovery/DataSetLineageService.java  |   9 +-
 .../atlas/discovery/DiscoveryService.java       |  59 ++++++++
 .../graph/DefaultGraphPersistenceStrategy.java  |   5 +-
 .../graph/GraphBackedDiscoveryService.java      |  92 ++++++------
 .../org/apache/atlas/query/Expressions.scala    |  19 ++-
 .../org/apache/atlas/query/GremlinQuery.scala   |  17 ++-
 .../org/apache/atlas/query/QueryParser.scala    |  60 +++++---
 .../scala/org/apache/atlas/query/Resolver.scala |   2 +-
 .../org/apache/atlas/query/TypeUtils.scala      |   4 +-
 .../discovery/DataSetLineageServiceTest.java    |   3 +-
 .../GraphBackedDiscoveryServiceTest.java        | 140 ++++++++++++-------
 .../GraphBackedMetadataRepositoryTest.java      |  14 +-
 .../service/DefaultMetadataServiceTest.java     |   6 +-
 .../atlas/discovery/DiscoveryService.java       |  52 -------
 .../java/org/apache/atlas/LocalAtlasClient.java |  11 +-
 .../org/apache/atlas/examples/QuickStart.java   |   2 +-
 .../resources/MetadataDiscoveryResource.java    |  92 ++++++++----
 .../NotificationHookConsumerIT.java             |  14 +-
 .../atlas/web/resources/BaseResourceIT.java     |   5 +
 .../web/resources/EntityJerseyResourceIT.java   |   7 +-
 .../MetadataDiscoveryJerseyResourceIT.java      | 125 +++++++++++------
 31 files changed, 579 insertions(+), 369 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 9a5e279..3d3aef2 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -218,7 +217,7 @@ public class HiveMetaStoreBridge {
 
     private Referenceable getEntityReferenceFromDSL(String typeName, String dslQuery) throws Exception {
         AtlasClient dgiClient = getAtlasClient();
-        JSONArray results = dgiClient.searchByDSL(dslQuery);
+        JSONArray results = dgiClient.searchByDSL(dslQuery, 1, 0);
         if (results.length() == 0) {
             return null;
         } else {
@@ -501,17 +500,6 @@ public class HiveMetaStoreBridge {
         atlasClient.updateEntity(referenceable.getId().id, referenceable);
     }
 
-    private Referenceable getEntityReferenceFromGremlin(String typeName, String gremlinQuery)
-    throws AtlasServiceException, JSONException {
-        AtlasClient client = getAtlasClient();
-        JSONArray results = client.searchByGremlin(gremlinQuery);
-        if (results.length() == 0) {
-            return null;
-        }
-        String guid = results.getJSONObject(0).getString(SEARCH_ENTRY_GUID_ATTR);
-        return new Referenceable(guid, typeName, null);
-    }
-
     public Referenceable fillStorageDesc(StorageDescriptor storageDesc, String tableQualifiedName,
         String sdQualifiedName, Id tableId) throws Exception {
         LOG.debug("Filling storage descriptor information for " + storageDesc);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
index 9f7f6b0..f8aa93a 100644
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
@@ -41,7 +40,6 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import scala.actors.threadpool.Arrays;
 
-import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -98,12 +96,12 @@ public class HiveMetaStoreBridgeTest {
 
         // return existing table
         when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME,
-                HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn(
+                HiveDataTypes.HIVE_TABLE.getName(), false), 1, 0)).thenReturn(
                 getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
         when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
         String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTables.get(0));
         when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
-                processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
+                processQualifiedName), 1, 0)).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
 
         HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
         bridge.importHiveMetadata(true);
@@ -117,7 +115,7 @@ public class HiveMetaStoreBridgeTest {
     private void returnExistingDatabase(String databaseName, AtlasClient atlasClient, String clusterName)
             throws AtlasServiceException, JSONException {
         when(atlasClient.searchByDSL(HiveMetaStoreBridge.getDatabaseDSLQuery(clusterName, databaseName,
-                HiveDataTypes.HIVE_DB.getName()))).thenReturn(
+                HiveDataTypes.HIVE_DB.getName()), 1, 0)).thenReturn(
                 getEntityReference("72e06b34-9151-4023-aa9d-b82103a50e76"));
     }
 
@@ -147,12 +145,11 @@ public class HiveMetaStoreBridgeTest {
         returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
 
         when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME,
-            TEST_TABLE_NAME,
-            HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn(
+            TEST_TABLE_NAME, HiveDataTypes.HIVE_TABLE.getName(), false), 1, 0)).thenReturn(
             getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
         String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTable);
         when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
-                processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
+                processQualifiedName), 1, 0)).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
         when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
 
         Partition partition = mock(Partition.class);
@@ -180,13 +177,12 @@ public class HiveMetaStoreBridgeTest {
         when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new RuntimeException("Timeout while reading data from hive metastore"));
 
         when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME,
-            table2Name,
-            HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn(
+            table2Name, HiveDataTypes.HIVE_TABLE.getName(), false), 1, 0)).thenReturn(
             getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
         when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
         String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTables.get(1));
         when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
-            processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
+            processQualifiedName), 1, 0)).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
 
         HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
         try {
@@ -206,13 +202,12 @@ public class HiveMetaStoreBridgeTest {
         when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new RuntimeException("Timeout while reading data from hive metastore"));
 
         when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME,
-            table2Name,
-            HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn(
+            table2Name, HiveDataTypes.HIVE_TABLE.getName(), false), 10, 0)).thenReturn(
             getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
         when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
         String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTables.get(1));
         when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
-            processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
+            processQualifiedName), 10, 0)).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
 
         HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
         try {
@@ -255,6 +250,4 @@ public class HiveMetaStoreBridgeTest {
             return attrValue.equals(((Referenceable) o).get(attrName));
         }
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java
----------------------------------------------------------------------
diff --git a/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java b/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java
index e965c7d..577fde6 100644
--- a/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java
+++ b/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java
@@ -131,12 +131,12 @@ public class SqoopHookIT {
         waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
-                JSONArray results = atlasClient.search(query);
+                JSONArray results = atlasClient.search(query, 10, 0);
                 return results.length() > 0;
             }
         });
 
-        JSONArray results = atlasClient.search(query);
+        JSONArray results = atlasClient.search(query, 10, 0);
         JSONObject row = results.getJSONObject(0).getJSONObject("t");
 
         return row.getString("id");

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java
index b33bb5f..e4a0d69 100644
--- a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java
+++ b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java
@@ -139,7 +139,7 @@ public class StormAtlasHookIT {
         String query = String.format("from %s where name = \"%s\"",
                 StormDataTypes.STORM_TOPOLOGY.getName(), TOPOLOGY_NAME);
 
-        JSONArray results = atlasClient.search(query);
+        JSONArray results = atlasClient.search(query, 10, 0);
         JSONObject row = results.getJSONObject(0);
 
         return row.has("$id$") ? row.getJSONObject("$id$").getString("id"): null;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/client/src/main/java/org/apache/atlas/AtlasClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java
index d3af6ad..d7543f2 100755
--- a/client/src/main/java/org/apache/atlas/AtlasClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClient.java
@@ -101,6 +101,8 @@ public class AtlasClient {
     public static final String URI_TRAITS = "traits";
 
     public static final String QUERY = "query";
+    public static final String LIMIT = "limit";
+    public static final String OFFSET = "offset";
     public static final String QUERY_TYPE = "queryType";
     public static final String ATTRIBUTE_NAME = "property";
     public static final String ATTRIBUTE_VALUE = "value";
@@ -479,7 +481,6 @@ public class AtlasClient {
         //Search operations
         SEARCH(BASE_URI + URI_SEARCH, HttpMethod.GET, Response.Status.OK),
         SEARCH_DSL(BASE_URI + URI_SEARCH + "/dsl", HttpMethod.GET, Response.Status.OK),
-        SEARCH_GREMLIN(BASE_URI + URI_SEARCH + "/gremlin", HttpMethod.GET, Response.Status.OK),
         SEARCH_FULL_TEXT(BASE_URI + URI_SEARCH + "/fulltext", HttpMethod.GET, Response.Status.OK),
 
         //Lineage operations based on dataset name
@@ -981,17 +982,21 @@ public class AtlasClient {
     }
 
     /**
-     * Search using gremlin/dsl/full text
+     * Search using dsl/full text
      * @param searchQuery
-     * @return
+     * @param limit number of rows to be returned in the result, used for pagination. maxlimit > limit > 0. -1 maps to atlas.search.defaultlimit property value
+     * @param offset offset to the results returned, used for pagination. offset >= 0. -1 maps to offset 0
+     * @return Query results
      * @throws AtlasServiceException
      */
-    public JSONArray search(final String searchQuery) throws AtlasServiceException {
+    public JSONArray search(final String searchQuery, final int limit, final int offset) throws AtlasServiceException {
         JSONObject result = callAPIWithRetries(API.SEARCH, null, new ResourceCreator() {
             @Override
             public WebResource createResource() {
                 WebResource resource = getResource(API.SEARCH);
                 resource = resource.queryParam(QUERY, searchQuery);
+                resource = resource.queryParam(LIMIT, String.valueOf(limit));
+                resource = resource.queryParam(OFFSET, String.valueOf(offset));
                 return resource;
             }
         });
@@ -1006,39 +1011,20 @@ public class AtlasClient {
     /**
      * Search given query DSL
      * @param query DSL query
+     * @param limit number of rows to be returned in the result, used for pagination. maxlimit > limit > 0. -1 maps to atlas.search.defaultlimit property value
+     * @param offset offset to the results returned, used for pagination. offset >= 0. -1 maps to offset 0
      * @return result json object
      * @throws AtlasServiceException
      */
-    public JSONArray searchByDSL(final String query) throws AtlasServiceException {
+    public JSONArray searchByDSL(final String query, final int limit, final int offset) throws AtlasServiceException {
         LOG.debug("DSL query: {}", query);
         JSONObject result = callAPIWithRetries(API.SEARCH_DSL, null, new ResourceCreator() {
             @Override
             public WebResource createResource() {
                 WebResource resource = getResource(API.SEARCH_DSL);
                 resource = resource.queryParam(QUERY, query);
-                return resource;
-            }
-        });
-        try {
-            return result.getJSONArray(RESULTS);
-        } catch (JSONException e) {
-            throw new AtlasServiceException(e);
-        }
-    }
-
-    /**
-     * Search given gremlin query
-     * @param gremlinQuery Gremlin query
-     * @return result json object
-     * @throws AtlasServiceException
-     */
-    public JSONArray searchByGremlin(final String gremlinQuery) throws AtlasServiceException {
-        LOG.debug("Gremlin query: " + gremlinQuery);
-        JSONObject result = callAPIWithRetries(API.SEARCH_GREMLIN, null, new ResourceCreator() {
-            @Override
-            public WebResource createResource() {
-                WebResource resource = getResource(API.SEARCH_GREMLIN);
-                resource = resource.queryParam(QUERY, gremlinQuery);
+                resource = resource.queryParam(LIMIT, String.valueOf(limit));
+                resource = resource.queryParam(OFFSET, String.valueOf(offset));
                 return resource;
             }
         });
@@ -1052,15 +1038,20 @@ public class AtlasClient {
     /**
      * Search given full text search
      * @param query Query
+     * @param limit number of rows to be returned in the result, used for pagination. maxlimit > limit > 0. -1 maps to atlas.search.defaultlimit property value
+     * @param offset offset to the results returned, used for pagination. offset >= 0. -1 maps to offset 0
+     * NOTE: Pagination is not implemented currently for full text search, so limit and offset are not used
      * @return result json object
      * @throws AtlasServiceException
      */
-    public JSONObject searchByFullText(final String query) throws AtlasServiceException {
+    public JSONObject searchByFullText(final String query, final int limit, final int offset) throws AtlasServiceException {
         return callAPIWithRetries(API.SEARCH_FULL_TEXT, null, new ResourceCreator() {
             @Override
             public WebResource createResource() {
                 WebResource resource = getResource(API.SEARCH_FULL_TEXT);
                 resource = resource.queryParam(QUERY, query);
+                resource = resource.queryParam(LIMIT, String.valueOf(limit));
+                resource = resource.queryParam(OFFSET, String.valueOf(offset));
                 return resource;
             }
         });

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/common/src/main/java/org/apache/atlas/AtlasProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/AtlasProperties.java b/common/src/main/java/org/apache/atlas/AtlasProperties.java
new file mode 100644
index 0000000..df1bccb
--- /dev/null
+++ b/common/src/main/java/org/apache/atlas/AtlasProperties.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+import org.apache.commons.configuration.Configuration;
+
+/**
+ * Utility for reading properties in atlas-application.properties.
+ */
+public final class AtlasProperties {
+    private static final Configuration APPLICATION_PROPERTIES;
+
+    private AtlasProperties() { }
+
+    static {
+        try {
+            APPLICATION_PROPERTIES = ApplicationProperties.get();
+        } catch (AtlasException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Enum that encapsulated each property name and its default value.
+     */
+    public enum AtlasProperty {
+        SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000),
+        SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100);
+
+        private final String propertyName;
+        private final Object defaultValue;
+
+        AtlasProperty(String propertyName, Object defaultValue) {
+            this.propertyName = propertyName;
+            this.defaultValue = defaultValue;
+        }
+    }
+
+    public static <T> T getProperty(AtlasProperty property) {
+        Object value = APPLICATION_PROPERTIES.getProperty(property.propertyName);
+        if (value == null) {
+            return (T) property.defaultValue;
+        } else {
+            return (T) value;
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/common/src/main/java/org/apache/atlas/utils/ParamChecker.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/utils/ParamChecker.java b/common/src/main/java/org/apache/atlas/utils/ParamChecker.java
index edfe355..2b06f22 100644
--- a/common/src/main/java/org/apache/atlas/utils/ParamChecker.java
+++ b/common/src/main/java/org/apache/atlas/utils/ParamChecker.java
@@ -158,7 +158,7 @@ public final class ParamChecker {
      * @param maxValue
      * @param name
      */
-    public static void lessThan(short value, short maxValue, String name) {
+    public static void lessThan(long value, long maxValue, String name) {
         if (value <= 0) {
             throw new IllegalArgumentException(name + " should be > 0, current value " + value);
         }
@@ -166,4 +166,10 @@ public final class ParamChecker {
             throw new IllegalArgumentException(name + " should be <= " + maxValue + ", current value " + value);
         }
     }
+
+    public static void greaterThan(long value, long minValue, String name) {
+        if (value <= minValue) {
+            throw new IllegalArgumentException(name + " should be > " + minValue + ", current value " + value);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/docs/src/site/twiki/Configuration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki
index 3ad0fbe..2e077d3 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -134,6 +134,18 @@ atlas.lineage.hive.table.schema.query=hive_table where name=?, columns
 </verbatim>
 
 
+---++ Search Configs
+Search APIs (DSL and full text search) support pagination and have optional limit and offset arguments. Following configs are related to search pagination
+
+<verbatim>
+# Default limit used when limit is not specified in API
+atlas.search.defaultlimit=100
+
+# Maximum limit allowed in API. Limits maximum results that can be fetched to make sure the atlas server doesn't run out of memory
+atlas.search.maxlimit=10000
+</verbatim>
+
+
 ---++ Notification Configs
 Refer http://kafka.apache.org/documentation.html#configuration for Kafka configuration. All Kafka configs should be prefixed with 'atlas.kafka.'
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/docs/src/site/twiki/Search.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Search.twiki b/docs/src/site/twiki/Search.twiki
index 58c9238..5b9431a 100644
--- a/docs/src/site/twiki/Search.twiki
+++ b/docs/src/site/twiki/Search.twiki
@@ -11,9 +11,9 @@ The grammar for the DSL is below.
 <verbatim>
 queryWithPath: query ~ opt(WITHPATH)
 
-query: rep1sep(singleQuery, opt(COMMA))
+query: querySrc ~ opt(loopExpression) ~ opt(selectClause) ~ opt(orderby) ~ opt(limitOffset)
 
-singleQuery: singleQrySrc ~ opt(loopExpression) ~ opt(selectClause) ~ opt(orderby) ~ opt(limitOffset)
+querySrc: rep1sep(singleQrySrc, opt(COMMA))
 
 singleQrySrc = FROM ~ fromSrc ~ opt(WHERE) ~ opt(expr ^? notIdExpression) |
         WHERE ~ (expr ^? notIdExpression) |
@@ -22,7 +22,7 @@ singleQrySrc = FROM ~ fromSrc ~ opt(WHERE) ~ opt(expr ^? notIdExpression) |
 
 fromSrc: identifier ~ AS ~ alias | identifier
 
-orderby: ORDERBY ~ order ~ opt (sortOrder) 
+orderby: ORDERBY ~ expr ~ opt (sortOrder)
     
 limitOffset: LIMIT ~ lmt ~ opt (offset)
     
@@ -87,24 +87,30 @@ Language Notes:
    * The _!WithPath_ clause can be used with transitive closure queries to retrieve the Path that
  connects the two related Entities. (We also provide a higher level interface for Closure Queries
   see scaladoc for 'org.apache.atlas.query.ClosureQuery')
-   * ORDERBY is optional. Orderby clause should be specified in single quote ('). When order by clause is specified case insensitive sorting is done in ascending order.
-    For sorting in descending order specify 'DESC' after order by clause. If no order by is specified then no default sorting is applied.
+   * ORDERBY is optional. When order by clause is specified, case insensitive sorting is done based on the column specified.
+    For sorting in descending order specify 'DESC' after order by clause. If no order by is specified, then no default sorting is applied.
    * LIMIT is optional. It limits the maximum number of objects to be fetched starting from specified optional offset. If no offset is specified count starts from beginning.
    * There are couple of Predicate functions different from SQL:
       * _is_ or _isa_can be used to filter Entities that have a particular Trait.
       * _has_ can be used to filter Entities that have a value for a particular Attribute.
-   * When querying for a space delimited multiple-word identifier, it need to be enclosed within
-   backquote (`)
+   * Any identifiers or constants with special characters(space,$,",{,}) should be enclosed within backquote (`)
 
 ---+++ DSL Examples
-
-   * from DB
+For the model,
+Asset - attributes name, owner, description
+DB - supertype Asset - attributes clusterName, parameters, comment
+Column - extends Asset - attributes type, comment
+Table - supertype Asset - db, columns, parameters, comment
+Traits - PII, Log Data
+
+DSL queries:
+* from DB
    * DB where name="Reporting" select name, owner
-   * DB where name="Reporting" select name, owner orderby 'name'
+   * DB where name="Reporting" select name, owner orderby name
    * DB where name="Reporting" select name limit 10
    * DB where name="Reporting" select name, owner limit 10 offset 0
-   * DB where name="Reporting" select name, owner orderby 'name' limit 10 offset 5
-   * DB where name="Reporting" select name, owner orderby 'name' desc limit 10 offset 5
+   * DB where name="Reporting" select name, owner orderby name limit 10 offset 5
+   * DB where name="Reporting" select name, owner orderby name desc limit 10 offset 5
    * DB has name
    * DB is !JdbcAccess
    * Column where Column isa PII
@@ -112,7 +118,6 @@ Language Notes:
    * Table where name="sales_fact", columns as column select column.name, column.dataType, column.comment
    * `Log Data`
 
-
 ---++ Full-text Search
 
 Atlas also exposes a lucene style full-text search capability.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index ef9f67f..3dfe614 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES:
 
 
 ALL CHANGES:
+ATLAS-347 Atlas search APIs should allow pagination of results (shwethags)
 ATLAS-639 Exception for lineage request (svimal2106 via shwethags)
 ATLAS-1022 Update typesystem wiki with details (yhemanth via shwethags)
 ATLAS-1021 Update Atlas architecture wiki (yhemanth via sumasai)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
index c6790de..1ded435 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
@@ -22,12 +22,14 @@ import com.thinkaurelius.titan.core.TitanGraph;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasProperties;
 import org.apache.atlas.GraphTransaction;
 import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
 import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
 import org.apache.atlas.query.GremlinQueryResult;
 import org.apache.atlas.query.InputLineageClosureQuery;
 import org.apache.atlas.query.OutputLineageClosureQuery;
+import org.apache.atlas.query.QueryParams;
 import org.apache.atlas.repository.MetadataRepository;
 import org.apache.atlas.repository.graph.GraphProvider;
 import org.apache.atlas.typesystem.exception.EntityNotFoundException;
@@ -173,7 +175,8 @@ public class DataSetLineageService implements LineageService {
     private String getSchemaForId(String typeName, String guid) throws DiscoveryException {
         final String schemaQuery =
                 String.format(propertiesConf.getString(DATASET_SCHEMA_QUERY_PREFIX + typeName), guid);
-        return discoveryService.searchByDSL(schemaQuery);
+        int limit = AtlasProperties.getProperty(AtlasProperties.AtlasProperty.SEARCH_MAX_LIMIT);
+        return discoveryService.searchByDSL(schemaQuery, new QueryParams(limit, 0));
     }
 
     @Override
@@ -192,7 +195,7 @@ public class DataSetLineageService implements LineageService {
      */
     private ReferenceableInstance validateDatasetNameExists(String datasetName) throws AtlasException {
         final String tableExistsQuery = String.format(DATASET_NAME_EXISTS_QUERY, datasetName);
-        GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery);
+        GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery, new QueryParams(1, 0));
         if (!(queryResult.rows().length() > 0)) {
             throw new EntityNotFoundException(datasetName + " does not exist");
         }
@@ -207,7 +210,7 @@ public class DataSetLineageService implements LineageService {
      */
     private String validateDatasetExists(String guid) throws AtlasException {
         final String datasetExistsQuery = String.format(DATASET_EXISTS_QUERY, guid);
-        GremlinQueryResult queryResult = discoveryService.evaluate(datasetExistsQuery);
+        GremlinQueryResult queryResult = discoveryService.evaluate(datasetExistsQuery, new QueryParams(1, 0));
         if (!(queryResult.rows().length() > 0)) {
             throw new EntityNotFoundException("Dataset with guid = " + guid + " does not exist");
         }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/repository/src/main/java/org/apache/atlas/discovery/DiscoveryService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/DiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/DiscoveryService.java
new file mode 100644
index 0000000..e86047e
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/discovery/DiscoveryService.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.discovery;
+
+import org.apache.atlas.query.QueryParams;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Metadata discovery service.
+ */
+public interface DiscoveryService {
+
+    /**
+     * Searches using Full text query
+     * @param query query string
+     * @param queryParams Default query parameters like limit, offset
+     * @return results json
+     * @throws DiscoveryException
+     */
+    String searchByFullText(String query, QueryParams queryParams) throws DiscoveryException;
+
+    /**
+     * Searches using DSL query
+     * @param dslQuery query string
+     * @param queryParams Default query parameters like limit, offset
+     * @return results json
+     * @throws DiscoveryException
+     */
+    String searchByDSL(String dslQuery, QueryParams queryParams) throws DiscoveryException;
+
+    /**
+     * Assumes the User is familiar with the persistence structure of the Repository.
+     * The given query is run uninterpreted against the underlying Graph Store.
+     * The results are returned as a List of Rows. each row is a Map of Key,Value pairs.
+     *
+     * @param gremlinQuery query in gremlin dsl format
+     * @return List of Maps
+     * @throws org.apache.atlas.discovery.DiscoveryException
+     */
+    List<Map<String, String>> searchByGremlin(String gremlinQuery) throws DiscoveryException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java b/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java
index e07a54e..b17eec7 100755
--- a/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java
@@ -142,7 +142,10 @@ public class DefaultGraphPersistenceStrategy implements GraphPersistenceStrategi
                 if (dataType.getName().equals(idType.getName())) {
                     structInstance.set(idType.typeNameAttrName(), GraphHelper.getProperty(structVertex, typeAttributeName()));
                     structInstance.set(idType.idAttrName(), GraphHelper.getProperty(structVertex, idAttributeName()));
-                    structInstance.set(idType.stateAttrName(), GraphHelper.getProperty(structVertex, stateAttributeName()));
+                    String stateValue = GraphHelper.getProperty(structVertex, stateAttributeName());
+                    if (stateValue != null) {
+                        structInstance.set(idType.stateAttrName(), stateValue);
+                    }
                 } else {
                     metadataRepository.getGraphToInstanceMapper()
                         .mapVertexToInstance(structVertex, structInstance, structType.fieldMapping().fields);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java
index 6def78b..0bf4622 100755
--- a/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java
@@ -32,6 +32,7 @@ import org.apache.atlas.query.GremlinEvaluator;
 import org.apache.atlas.query.GremlinQuery;
 import org.apache.atlas.query.GremlinQueryResult;
 import org.apache.atlas.query.GremlinTranslator;
+import org.apache.atlas.query.QueryParams;
 import org.apache.atlas.query.QueryParser;
 import org.apache.atlas.query.QueryProcessor;
 import org.apache.atlas.repository.Constants;
@@ -83,8 +84,8 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
     // .html#query-string-syntax for query syntax
     @Override
     @GraphTransaction
-    public String searchByFullText(String query) throws DiscoveryException {
-        String graphQuery = String.format("v.%s:(%s)", Constants.ENTITY_TEXT_PROPERTY_KEY, query);
+    public String searchByFullText(String query, QueryParams queryParams) throws DiscoveryException {
+        String graphQuery = String.format("v.\"%s\":(%s)", Constants.ENTITY_TEXT_PROPERTY_KEY, query);
         LOG.debug("Full text query: {}", graphQuery);
         Iterator<TitanIndexQuery.Result<Vertex>> results =
                 titanGraph.indexQuery(Constants.FULLTEXT_INDEX, graphQuery).vertices().iterator();
@@ -112,27 +113,20 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
         return response.toString();
     }
 
-    /**
-     * Search using query DSL.
-     *
-     * @param dslQuery query in DSL format.
-     * @return JSON representing the type and results.
-     */
     @Override
     @GraphTransaction
-    public String searchByDSL(String dslQuery) throws DiscoveryException {
-        LOG.info("Executing dsl query={}", dslQuery);
-        GremlinQueryResult queryResult = evaluate(dslQuery);
+    public String searchByDSL(String dslQuery, QueryParams queryParams) throws DiscoveryException {
+        GremlinQueryResult queryResult = evaluate(dslQuery, queryParams);
         return queryResult.toJson();
     }
 
-    public GremlinQueryResult evaluate(String dslQuery) throws DiscoveryException {
+    public GremlinQueryResult evaluate(String dslQuery, QueryParams queryParams) throws DiscoveryException {
         LOG.info("Executing dsl query={}", dslQuery);
         try {
-            Either<Parsers.NoSuccess, Expressions.Expression> either = QueryParser.apply(dslQuery);
+            Either<Parsers.NoSuccess, Expressions.Expression> either = QueryParser.apply(dslQuery, queryParams);
             if (either.isRight()) {
                 Expressions.Expression expression = either.right().get();
-                return evaluate(expression);
+                return evaluate(dslQuery, expression);
             } else {
                 throw new DiscoveryException("Invalid expression : " + dslQuery + ". " + either.left());
             }
@@ -141,8 +135,16 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
         }
     }
 
-    public GremlinQueryResult evaluate(Expressions.Expression expression) {
+    private GremlinQueryResult evaluate(String dslQuery, Expressions.Expression expression) {
         Expressions.Expression validatedExpression = QueryProcessor.validate(expression);
+
+        //If the final limit is 0, don't launch the query, return with 0 rows
+        if (validatedExpression instanceof Expressions.LimitExpression
+                && ((Expressions.LimitExpression) validatedExpression).limit().rawValue() == 0) {
+            return new GremlinQueryResult(dslQuery, validatedExpression.dataType(),
+                    scala.collection.immutable.List.empty());
+        }
+
         GremlinQuery gremlinQuery = new GremlinTranslator(validatedExpression, graphPersistenceStrategy).translate();
         LOG.debug("Query = {}", validatedExpression);
         LOG.debug("Expression Tree = {}", validatedExpression.treeString());
@@ -176,40 +178,42 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
         }
     }
 
-    private List<Map<String, String>> extractResult(Object o) throws DiscoveryException {
-        if (!(o instanceof List)) {
-            throw new DiscoveryException(String.format("Cannot process result %s", o.toString()));
-        }
-
-        List l = (List) o;
+    private List<Map<String, String>> extractResult(final Object o) throws DiscoveryException {
         List<Map<String, String>> result = new ArrayList<>();
-        for (Object r : l) {
-
-            Map<String, String> oRow = new HashMap<>();
-            if (r instanceof Map) {
-                @SuppressWarnings("unchecked") Map<Object, Object> iRow = (Map) r;
-                for (Map.Entry e : iRow.entrySet()) {
-                    Object k = e.getKey();
-                    Object v = e.getValue();
-                    oRow.put(k.toString(), v.toString());
-                }
-            } else if (r instanceof TitanVertex) {
-                Iterable<TitanProperty> ps = ((TitanVertex) r).getProperties();
-                for (TitanProperty tP : ps) {
-                    String pName = tP.getPropertyKey().getName();
-                    Object pValue = ((TitanVertex) r).getProperty(pName);
-                    if (pValue != null) {
-                        oRow.put(pName, pValue.toString());
+        if (o instanceof List) {
+            List l = (List) o;
+            for (Object r : l) {
+
+                Map<String, String> oRow = new HashMap<>();
+                if (r instanceof Map) {
+                    @SuppressWarnings("unchecked") Map<Object, Object> iRow = (Map) r;
+                    for (Map.Entry e : iRow.entrySet()) {
+                        Object k = e.getKey();
+                        Object v = e.getValue();
+                        oRow.put(k.toString(), v.toString());
+                    }
+                } else if (r instanceof TitanVertex) {
+                    Iterable<TitanProperty> ps = ((TitanVertex) r).getProperties();
+                    for (TitanProperty tP : ps) {
+                        String pName = tP.getPropertyKey().getName();
+                        Object pValue = ((TitanVertex) r).getProperty(pName);
+                        if (pValue != null) {
+                            oRow.put(pName, pValue.toString());
+                        }
                     }
+
+                } else if (r instanceof String) {
+                    oRow.put("", r.toString());
+                } else {
+                    throw new DiscoveryException(String.format("Cannot process result %s", o.toString()));
                 }
 
-            } else if (r instanceof String) {
-                oRow.put("", r.toString());
-            } else {
-                throw new DiscoveryException(String.format("Cannot process result %s", o.toString()));
+                result.add(oRow);
             }
-
-            result.add(oRow);
+        } else {
+            result.add(new HashMap<String, String>() {{
+                put("result", o.toString());
+            }});
         }
         return result;
     }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/repository/src/main/scala/org/apache/atlas/query/Expressions.scala
----------------------------------------------------------------------
diff --git a/repository/src/main/scala/org/apache/atlas/query/Expressions.scala b/repository/src/main/scala/org/apache/atlas/query/Expressions.scala
index ab7e81c..297aa2b 100755
--- a/repository/src/main/scala/org/apache/atlas/query/Expressions.scala
+++ b/repository/src/main/scala/org/apache/atlas/query/Expressions.scala
@@ -18,11 +18,8 @@
 
 package org.apache.atlas.query
 
-import java.util
-
 import com.google.common.collect.ImmutableCollection
 import org.apache.atlas.AtlasException
-import org.apache.atlas.typesystem.ITypedInstance
 import org.apache.atlas.typesystem.types.DataTypes.{ArrayType, PrimitiveType, TypeCategory}
 import org.apache.atlas.typesystem.types._
 
@@ -35,15 +32,15 @@ object Expressions {
         extends AtlasException(message, cause, enableSuppression, writableStackTrace) {
 
         def this(e: Expression, message: String) {
-            this(e, message, null, false, false)
+            this(e, message, null, false, true)
         }
 
         def this(e: Expression, message: String, cause: Throwable) {
-            this(e, message, cause, false, false)
+            this(e, message, cause, false, true)
         }
 
         def this(e: Expression, cause: Throwable) {
-            this(e, null, cause, false, false)
+            this(e, null, cause, false, true)
         }
 
         override def getMessage: String = {
@@ -333,7 +330,7 @@ object Expressions {
 
         def limit(lmt: Literal[Integer], offset : Literal[Integer]) = new LimitExpression(this, lmt, offset)
 
-        def order(odr: String, asc: Boolean) = new OrderExpression(this, odr, asc)
+        def order(odr: Expression, asc: Boolean) = new OrderExpression(this, odr, asc)
     }
 
     trait BinaryNode {
@@ -775,9 +772,9 @@ object Expressions {
     override def toString = s"$child withPath"
   }
 
-  case class LimitExpression(child: Expression, limit: Literal[Integer], offset: Literal[Integer]) extends Expression with UnaryNode { 
+  case class LimitExpression(child: Expression, limit: Literal[Integer], offset: Literal[Integer]) extends Expression with UnaryNode {
 
-    override def toString = s"$child  limit $limit offset $offset "
+    override def toString = s"$child limit $limit offset $offset "
 
     lazy val dataType = {
             if (!resolved) {
@@ -788,9 +785,9 @@ object Expressions {
     }
   }
 
-  case class OrderExpression(child: Expression, odr: String, asc: Boolean) extends Expression with UnaryNode {
+  case class OrderExpression(child: Expression, odr: Expression, asc: Boolean) extends Expression with UnaryNode {
 
-    override def toString = s"$child  order $odr asc $asc"
+    override def toString = s"$child orderby $odr asc $asc"
 
     lazy val dataType = {
             if (!resolved) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/repository/src/main/scala/org/apache/atlas/query/GremlinQuery.scala
----------------------------------------------------------------------
diff --git a/repository/src/main/scala/org/apache/atlas/query/GremlinQuery.scala b/repository/src/main/scala/org/apache/atlas/query/GremlinQuery.scala
index ee221c9..d336f1e 100755
--- a/repository/src/main/scala/org/apache/atlas/query/GremlinQuery.scala
+++ b/repository/src/main/scala/org/apache/atlas/query/GremlinQuery.scala
@@ -317,13 +317,20 @@ class GremlinTranslator(expr: Expression,
           s"${genQuery(child, inSelect)}.path"
         }
         case order@OrderExpression(child, odr, asc) => {
+          var orderExpression = odr
+          if(odr.isInstanceOf[BackReference]) { orderExpression = odr.asInstanceOf[BackReference].reference }
+          else if (odr.isInstanceOf[AliasExpression]) { orderExpression = odr.asInstanceOf[AliasExpression].child}
+          val orderbyProperty = genQuery(orderExpression, false)
+          val bProperty = s"it.b.$orderbyProperty"
+          val aProperty = s"it.a.$orderbyProperty"
+          val aCondition = s"($aProperty != null ? $aProperty.toLowerCase(): $aProperty)"
+          val bCondition = s"($bProperty != null ? $bProperty.toLowerCase(): $bProperty)"
           var orderby = ""
-             asc  match {
+            asc  match {
             //builds a closure comparison function based on provided order by clause in DSL. This will be used to sort the results by gremlin order pipe.
             //Ordering is case insensitive.
-             case false=> orderby = s"order{(it.b.getProperty('$odr') !=null ? it.b.getProperty('$odr').toLowerCase(): it.b.getProperty('$odr')) <=> (it.a.getProperty('$odr') != null ? it.a.getProperty('$odr').toLowerCase(): it.a.getProperty('$odr'))}"//descending
-              case _ => orderby = s"order{(it.a.getProperty('$odr') != null ? it.a.getProperty('$odr').toLowerCase(): it.a.getProperty('$odr')) <=> (it.b.getProperty('$odr') !=null ? it.b.getProperty('$odr').toLowerCase(): it.b.getProperty('$odr'))}"
-              
+              case false=> orderby = s"order{$bCondition <=> $aCondition}"//descending
+              case _ => orderby = s"order{$aCondition <=> $bCondition}"
             }
           s"""${genQuery(child, inSelect)}.$orderby"""
         }
@@ -410,7 +417,7 @@ class GremlinTranslator(expr: Expression,
         e1 = e1.transformUp(traitClauseWithInstanceForTop(e1))
         
         //Following code extracts the select expressions from expression tree.
-        
+
              val  se = SelectExpressionHelper.extractSelectExpression(e1)
              if (se.isDefined)
              {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/repository/src/main/scala/org/apache/atlas/query/QueryParser.scala
----------------------------------------------------------------------
diff --git a/repository/src/main/scala/org/apache/atlas/query/QueryParser.scala b/repository/src/main/scala/org/apache/atlas/query/QueryParser.scala
index 60b57d9..4d2429e 100755
--- a/repository/src/main/scala/org/apache/atlas/query/QueryParser.scala
+++ b/repository/src/main/scala/org/apache/atlas/query/QueryParser.scala
@@ -93,7 +93,7 @@ trait ExpressionUtils {
         input.limit(lmt, offset) 
     }
     
-    def order(input: Expression, odr: String, asc: Boolean) = {
+    def order(input: Expression, odr: Expression, asc: Boolean) = {
         input.order(odr, asc)
     }
         
@@ -118,6 +118,9 @@ trait ExpressionUtils {
         sngQuery2.transformUp(replaceIdWithField(leftSrcId, snglQuery1.field(leftSrcId.name)))
     }
 }
+
+case class QueryParams(limit: Int, offset: Int)
+
 /**
  * Query parser is used to parse the DSL query. It uses scala PackratParsers and pattern matching to extract the expressions.
  * It builds up a expression tree.
@@ -134,7 +137,12 @@ object QueryParser extends StandardTokenParsers with QueryKeywords with Expressi
 
     override val lexical = new QueryLexer(queryreservedWords, querydelims)
 
-    def apply(input: String): Either[NoSuccess, Expression] = synchronized {
+  /**
+    * @param input query string
+    * @param queryParams query parameters that contains limit and offset
+    * @return
+    */
+    def apply(input: String)(implicit queryParams: QueryParams = null): Either[NoSuccess, Expression] = synchronized {
         phrase(queryWithPath)(new lexical.Scanner(input)) match {
             case Success(r, x) => Right(r)
             case f@Failure(m, x) => Left(f)
@@ -142,23 +150,21 @@ object QueryParser extends StandardTokenParsers with QueryKeywords with Expressi
         }
     }
 
-    def queryWithPath = query ~ opt(WITHPATH) ^^ {
+    import scala.math._
+
+    def queryWithPath(implicit queryParams: QueryParams) = query ~ opt(WITHPATH) ^^ {
       case q ~ None => q
       case q ~ p => q.path()
     }
 
-    def query: Parser[Expression] = rep1sep(singleQuery, opt(COMMA)) ^^ { l => l match {
-        case h :: Nil => h
-        case h :: t => t.foldLeft(h)(merge(_, _))
-    }
-    }
-      /**
+    /**
      * A singleQuery can have the following forms:
      * 1. SrcQuery [select] [orderby desc] [Limit x offset y] -> source query followed by optional select statement followed by optional order by followed by optional limit
      * eg: Select "hive_db where hive_db has name orderby 'hive_db.owner' limit 2 offset 1"
-     * @return
+        *
+        * @return
      */
-    def singleQuery = singleQrySrc ~ opt(loopExpression) ~ opt(selectClause) ~ opt(orderby) ~ opt(limitOffset) ^^ {
+    def query(implicit queryParams: QueryParams) = querySrc ~ opt(loopExpression) ~ opt(selectClause) ~ opt(orderby) ~ opt(limitOffset) ^^ {
       case s ~ l ~ sel ~ odr ~ lmtoff => {
         var expressiontree = s
         if (l.isDefined) //Note: The order of if statements is important. 
@@ -169,18 +175,30 @@ object QueryParser extends StandardTokenParsers with QueryKeywords with Expressi
         {
           expressiontree = order(expressiontree, odr.get._1, odr.get._2)
         }
-        if (lmtoff.isDefined)
-        {
-          expressiontree = limit(expressiontree, int (lmtoff.get._1), int (lmtoff.get._2))
-        }
         if (sel.isDefined)
         {
           expressiontree = select(expressiontree, sel.get)
         }
+        if (queryParams != null && lmtoff.isDefined)
+        {
+          val mylimit = int(min(queryParams.limit, max(lmtoff.get._1 - queryParams.offset, 0)))
+          val myoffset = int(queryParams.offset + lmtoff.get._2)
+          expressiontree = limit(expressiontree, mylimit, myoffset)
+        } else if(lmtoff.isDefined) {
+          expressiontree = limit(expressiontree, int(lmtoff.get._1), int(lmtoff.get._2))
+        } else if(queryParams != null) {
+          expressiontree = limit(expressiontree, int(queryParams.limit), int(queryParams.offset))
+        }
         expressiontree
       }
     }
 
+    def querySrc: Parser[Expression] = rep1sep(singleQrySrc, opt(COMMA)) ^^ { l => l match {
+        case h :: Nil => h
+        case h :: t => t.foldLeft(h)(merge(_, _))
+    }
+    }
+
     /**
      * A SingleQuerySrc can have the following forms:
      * 1. FROM id [WHERE] [expr] -> from optionally followed by a filter
@@ -218,14 +236,14 @@ object QueryParser extends StandardTokenParsers with QueryKeywords with Expressi
     def fromSrc = identifier ~ AS ~ alias ^^ { case s ~ a ~ al => s.as(al)} |
         identifier
 
-    def orderby = ORDERBY ~ order ~ opt (asce) ^^ {
+    def orderby = ORDERBY ~ expr ~ opt (asce) ^^ {
       case o ~ odr ~ None => (odr, true)
       case o ~ odr ~ asc => (odr, asc.get)
     }
     
-    def limitOffset = LIMIT ~ lmt ~ opt (offset) ^^ {
-      case l ~ lt ~ None => (lt, 0)
-      case l ~ lt ~ of => (lt, of.get)
+    def limitOffset: Parser[(Int, Int)] = LIMIT ~ lmt ~ opt (offset) ^^ {
+      case l ~ lt ~ None => (lt.toInt, 0)
+      case l ~ lt ~ of => (lt.toInt, of.get.toInt)
     }
     
     def offset = OFFSET ~ ofset  ^^ {
@@ -237,7 +255,7 @@ object QueryParser extends StandardTokenParsers with QueryKeywords with Expressi
       case  _ => true
     }
     
-    def loopExpression: Parser[(Expression, Option[Literal[Integer]], Option[String])] =
+    def loopExpression(implicit queryParams: QueryParams): Parser[(Expression, Option[Literal[Integer]], Option[String])] =
         LOOP ~ (LPAREN ~> query <~ RPAREN) ~ opt(intConstant <~ TIMES) ~ opt(AS ~> alias) ^^ {
             case l ~ e ~ None ~ a => (e, None, a)
             case l ~ e ~ Some(i) ~ a => (e, Some(int(i)), a)
@@ -297,8 +315,6 @@ object QueryParser extends StandardTokenParsers with QueryKeywords with Expressi
     
     def ofset = intConstant
 
-    def order = ident | stringLit
-    
     def asc =  ident | stringLit
     
     def literal = booleanConstant ^^ {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/repository/src/main/scala/org/apache/atlas/query/Resolver.scala
----------------------------------------------------------------------
diff --git a/repository/src/main/scala/org/apache/atlas/query/Resolver.scala b/repository/src/main/scala/org/apache/atlas/query/Resolver.scala
index 5fc9400..cff92af 100755
--- a/repository/src/main/scala/org/apache/atlas/query/Resolver.scala
+++ b/repository/src/main/scala/org/apache/atlas/query/Resolver.scala
@@ -97,7 +97,7 @@ class Resolver(srcExpr: Option[Expression] = None, aliases: Map[String, Expressi
         }
         case order@OrderExpression(child, odr, asc) => {
             val r = new Resolver(Some(child), child.namedExpressions)
-            return new OrderExpression(child.transformUp(r), odr, asc)
+            return new OrderExpression(child, odr.transformUp(r), asc)
         }
         case x => x
     }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/repository/src/main/scala/org/apache/atlas/query/TypeUtils.scala
----------------------------------------------------------------------
diff --git a/repository/src/main/scala/org/apache/atlas/query/TypeUtils.scala b/repository/src/main/scala/org/apache/atlas/query/TypeUtils.scala
index 5a64c53..ddcc106 100755
--- a/repository/src/main/scala/org/apache/atlas/query/TypeUtils.scala
+++ b/repository/src/main/scala/org/apache/atlas/query/TypeUtils.scala
@@ -22,7 +22,7 @@ import java.util
 import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.atlas.AtlasException
-import org.apache.atlas.query.Expressions.{PathExpression, SelectExpression}
+import org.apache.atlas.query.Expressions.{LimitExpression, PathExpression, SelectExpression}
 import org.apache.atlas.repository.Constants
 import org.apache.atlas.typesystem.types.DataTypes.{ArrayType, PrimitiveType, TypeCategory}
 import org.apache.atlas.typesystem.types._
@@ -80,7 +80,7 @@ object TypeUtils {
         val resultAttr = new AttributeDefinition(resultAttrName, resultType.getName, Multiplicity.REQUIRED, false, null)
         val typName = s"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}"
         val m : java.util.HashMap[String, IDataType[_]] = new util.HashMap[String, IDataType[_]]()
-        if ( pE.child.isInstanceOf[SelectExpression]) {
+        if (pE.child.isInstanceOf[SelectExpression] || pE.child.isInstanceOf[LimitExpression]) {
           m.put(pE.child.dataType.getName, pE.child.dataType)
         }
         typSystem.defineQueryResultType(typName, m, pathAttr, resultAttr);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
index 460a88f..aeb03c5 100644
--- a/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
@@ -23,6 +23,7 @@ import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.BaseRepositoryTest;
 import org.apache.atlas.RepositoryMetadataModule;
+import org.apache.atlas.query.QueryParams;
 import org.apache.atlas.typesystem.ITypedReferenceableInstance;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
@@ -117,7 +118,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
     @Test(dataProvider = "dslQueriesProvider")
     public void testSearchByDSLQueries(String dslQuery) throws Exception {
         System.out.println("Executing dslQuery = " + dslQuery);
-        String jsonResults = discoveryService.searchByDSL(dslQuery);
+        String jsonResults = discoveryService.searchByDSL(dslQuery, new QueryParams(100, 0));
         assertNotNull(jsonResults);
 
         JSONObject results = new JSONObject(jsonResults);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
index 313a2ae..df3fe87 100755
--- a/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
@@ -25,6 +25,7 @@ import org.apache.atlas.RepositoryMetadataModule;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.TestUtils;
 import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
+import org.apache.atlas.query.QueryParams;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.MetadataRepository;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
@@ -72,6 +73,7 @@ public class GraphBackedDiscoveryServiceTest extends BaseRepositoryTest {
 
     @Inject
     private GraphBackedDiscoveryService discoveryService;
+    private QueryParams queryParams = new QueryParams(40, 0);
 
     @BeforeClass
     public void setUp() throws Exception {
@@ -128,12 +130,16 @@ public class GraphBackedDiscoveryServiceTest extends BaseRepositoryTest {
         super.tearDown();
     }
 
+    private String searchByDSL(String dslQuery) throws Exception {
+        return discoveryService.searchByDSL(dslQuery, queryParams);
+    }
+
     @Test
     public void testSearchBySystemProperties() throws Exception {
         //system property in select
         String dslQuery = "from Department select __guid";
 
-        String jsonResults = discoveryService.searchByDSL(dslQuery);
+        String jsonResults = searchByDSL(dslQuery);
         assertNotNull(jsonResults);
 
         JSONObject results = new JSONObject(jsonResults);
@@ -147,7 +153,7 @@ public class GraphBackedDiscoveryServiceTest extends BaseRepositoryTest {
         //system property in where clause
         String guid = rows.getJSONObject(0).getString("__guid");
         dslQuery = "Department where __guid = '" + guid + "' and __state = 'ACTIVE'";
-        jsonResults = discoveryService.searchByDSL(dslQuery);
+        jsonResults = searchByDSL(dslQuery);
         assertNotNull(jsonResults);
 
         results = new JSONObject(jsonResults);
@@ -162,7 +168,7 @@ public class GraphBackedDiscoveryServiceTest extends BaseRepositoryTest {
     public void testSearchByDSLReturnsEntity() throws Exception {
         String dslQuery = "from Department";
 
-        String jsonResults = discoveryService.searchByDSL(dslQuery);
+        String jsonResults = searchByDSL(dslQuery);
         assertNotNull(jsonResults);
 
         JSONObject results = new JSONObject(jsonResults);
@@ -190,8 +196,7 @@ public class GraphBackedDiscoveryServiceTest extends BaseRepositoryTest {
     @Test(expectedExceptions = Throwable.class)
     public void testSearchByDSLBadQuery() throws Exception {
         String dslQuery = "from blah";
-
-        discoveryService.searchByDSL(dslQuery);
+        searchByDSL(dslQuery);
         Assert.fail();
     }
 
@@ -383,6 +388,30 @@ public class GraphBackedDiscoveryServiceTest extends BaseRepositoryTest {
         };
     }
 
+    @DataProvider(name = "dslExplicitLimitQueriesProvider")
+    private Object[][] createDSLQueriesWithExplicitLimit() {
+        return new Object[][]{
+                {"hive_column", 37, 40, 0},//with higher limit all rows returned
+                {"hive_column limit 10", 10, 50, 0},//lower limit in query
+                {"hive_column select hive_column.name limit 10", 5, 5, 0},//lower limit in query param
+                {"hive_column select hive_column.name withPath", 20, 20, 0},//limit only in params
+                //with offset, only remaining rows returned
+                {"hive_column select hive_column.name limit 40 withPath", 17, 40, 20},
+                //with higher offset, no rows returned
+                {"hive_column select hive_column.name limit 40 withPath", 0, 40, 40},
+                //offset used from query
+                {"hive_column select hive_column.name limit 40 offset 10", 27, 40, 0},
+                //offsets in query and parameter added up
+                {"hive_column select hive_column.name limit 40 offset 10", 17, 40, 10},
+                //works with where clause
+                {"hive_db where name = 'Reporting' limit 10 offset 0", 1, 40, 0},
+                //works with joins
+                {"hive_db, hive_table where db.name = 'Reporting' limit 10", 1, 1, 0},
+                {"hive_column limit 25", 5, 10, 20},    //last page should return records limited by limit in query
+                {"hive_column limit 25", 0, 10, 30},    //offset > limit returns 0 rows
+        };
+    }
+
     @DataProvider(name = "dslLimitQueriesProvider")
     private Object[][] createDSLQueriesWithLimit() {
         return new Object[][]{
@@ -390,6 +419,7 @@ public class GraphBackedDiscoveryServiceTest extends BaseRepositoryTest {
                 {"hive_column select hive_column.name limit 10 ", 10},
                 {"hive_column select hive_column.name  withPath", 37},
                 {"hive_column select hive_column.name limit 10 withPath", 10},
+
                 {"from hive_db", 3},
                 {"from hive_db limit 2", 2},
                 {"from hive_db limit 2 offset 0", 2},
@@ -543,60 +573,60 @@ public class GraphBackedDiscoveryServiceTest extends BaseRepositoryTest {
 //                {"hive_db  hive_table  orderby 'hive_db.owner'", 8, "owner", isAscending},
 //                {"hive_db hive_table orderby 'hive_db.owner' limit 5", 5, "owner", isAscending},
 //                {"hive_db hive_table orderby 'hive_db.owner' limit 5 offset 5", 3, "owner", isAscending},
-                {"hive_column select hive_column.name orderby 'hive_column.name' limit 10 withPath", 10, "name", isAscending},
-                {"hive_column select hive_column.name orderby 'hive_column.name' asc limit 10 withPath", 10, "name", isAscending},
+                {"hive_column select hive_column.name orderby name limit 10 withPath", 10, "name", isAscending},
+                {"hive_column select hive_column.name orderby name asc limit 10 withPath", 10, "name", isAscending},
                 {"hive_column select hive_column.name orderby 'hive_column.name' desc limit 10 withPath", 10, "name", !isAscending},
-                {"from hive_db orderby 'hive_db.owner' limit 3", 3, "owner", isAscending},
-                {"hive_db where hive_db.name=\"Reporting\" orderby 'owner'", 1, "owner", isAscending},
+                {"from hive_db orderby owner limit 3", 3, "owner", isAscending},
+                {"hive_db where hive_db.name=\"Reporting\" orderby owner", 1, "owner", isAscending},
                 
-                {"hive_db where hive_db.name=\"Reporting\" orderby 'hive_db.owner' limit 10 ", 1, "owner", isAscending},
-                {"hive_db where hive_db.name=\"Reporting\" select name, owner orderby 'hive_db.name' ", 1, "name", isAscending},
+                {"hive_db where hive_db.name=\"Reporting\" orderby owner limit 10", 1, "owner", isAscending},
+                {"hive_db where hive_db.name=\"Reporting\" select name, owner orderby name", 1, "name", isAscending},
                 {"hive_db has name orderby 'hive_db.owner' limit 10 offset 0", 3, "owner", isAscending},
                 
-                {"from hive_table orderby 'hive_table.owner'", 10, "owner", isAscending},
+                {"from hive_table orderby owner", 10, "owner", isAscending},
                 {"from hive_table orderby 'hive_table.owner' limit 8", 8, "owner", isAscending},
                 
-                {"hive_table orderby 'hive_table.owner'", 10, "owner", isAscending},
-                {"hive_table orderby 'hive_table.owner' limit 8", 8, "owner", isAscending},
-                {"hive_table orderby 'hive_table.owner' limit 8 offset 0", 8, "owner", isAscending},
+                {"hive_table orderby owner", 10, "owner", isAscending},
+                {"hive_table orderby owner limit 8", 8, "owner", isAscending},
+                {"hive_table orderby owner limit 8 offset 0", 8, "owner", isAscending},
                 {"hive_table orderby 'hive_table.owner' desc limit 8 offset 0", 8, "owner", !isAscending},
                 
-                {"hive_table isa Dimension orderby 'hive_table.owner'", 3, "owner", isAscending},//order not working
-                {"hive_table isa Dimension orderby 'hive_table.owner' limit 3", 3, "owner", isAscending},
-                {"hive_table isa Dimension orderby 'hive_table.owner' limit 3 offset 0", 3, "owner", isAscending},
+                {"hive_table isa Dimension orderby owner", 3, "owner", isAscending},//order not working
+                {"hive_table isa Dimension orderby owner limit 3", 3, "owner", isAscending},
+                {"hive_table isa Dimension orderby owner limit 3 offset 0", 3, "owner", isAscending},
                 {"hive_table isa Dimension orderby 'hive_table.owner' desc limit 3 offset 0", 3, "owner", !isAscending},
                 
-                {"hive_column where hive_column isa PII orderby 'hive_column.name'", 8, "name", isAscending},
-                {"hive_column where hive_column isa PII orderby 'hive_column.name' limit 5", 5, "name", isAscending},
-                {"hive_column where hive_column isa PII orderby 'hive_column.name' limit 5 offset 1", 5, "name", isAscending},
+                {"hive_column where hive_column isa PII orderby name", 8, "name", isAscending},
+                {"hive_column where hive_column isa PII orderby name limit 5", 5, "name", isAscending},
+                {"hive_column where hive_column isa PII orderby name limit 5 offset 1", 5, "name", isAscending},
                 {"hive_column where hive_column isa PII orderby 'hive_column.name' desc limit 5 offset 1", 5, "name", !isAscending},
                 
                 
-                {"hive_column select hive_column.name orderby 'hive_column.name' ", 37, "hive_column.name", isAscending},
-                {"hive_column select hive_column.name orderby 'hive_column.name' limit 5", 5, "hive_column.name", isAscending},
-                {"hive_column select hive_column.name orderby 'hive_column.name' desc limit 5", 5, "hive_column.name", !isAscending},
+                {"hive_column select hive_column.name orderby name", 37, "hive_column.name", isAscending},
+                {"hive_column select hive_column.name orderby name limit 5", 5, "hive_column.name", isAscending},
+                {"hive_column select hive_column.name orderby name desc limit 5", 5, "hive_column.name", !isAscending},
                 {"hive_column select hive_column.name orderby 'hive_column.name' limit 5 offset 36", 1, "hive_column.name", isAscending},
                 
-                {"hive_column select name orderby 'hive_column.name'", 37, "name", isAscending},
-                {"hive_column select name orderby 'hive_column.name' limit 5", 5, "name", isAscending},
+                {"hive_column select name orderby name", 37, "name", isAscending},
+                {"hive_column select name orderby name limit 5", 5, "name", isAscending},
                 {"hive_column select name orderby 'hive_column.name' desc", 37, "name", !isAscending},
                 
-                {"hive_column where hive_column.name=\"customer_id\" orderby 'hive_column.name'", 6, "name", isAscending},
-                {"hive_column where hive_column.name=\"customer_id\" orderby 'hive_column.name' limit 2", 2, "name", isAscending},
+                {"hive_column where hive_column.name=\"customer_id\" orderby name", 6, "name", isAscending},
+                {"hive_column where hive_column.name=\"customer_id\" orderby name limit 2", 2, "name", isAscending},
                 {"hive_column where hive_column.name=\"customer_id\" orderby 'hive_column.name' limit 2 offset 1", 2, "name", isAscending},
                 
-                {"from hive_table select owner orderby 'hive_table.owner'", 10, "owner", isAscending},
-                {"from hive_table select owner orderby 'hive_table.owner' limit 5", 5, "owner", isAscending},
-                {"from hive_table select owner orderby 'hive_table.owner' desc limit 5", 5, "owner", !isAscending},
+                {"from hive_table select owner orderby owner", 10, "owner", isAscending},
+                {"from hive_table select owner orderby owner limit 5", 5, "owner", isAscending},
+                {"from hive_table select owner orderby owner desc limit 5", 5, "owner", !isAscending},
                 {"from hive_table select owner orderby 'hive_table.owner' limit 5 offset 5", 5, "owner", isAscending},
                 
-                {"hive_db where (name = \"Reporting\") orderby 'hive_db.name'", 1, "name", isAscending},
+                {"hive_db where (name = \"Reporting\") orderby name", 1, "name", isAscending},
                 {"hive_db where (name = \"Reporting\") orderby 'hive_db.name' limit 10", 1, "name", isAscending},
                 {"hive_db where (name = \"Reporting\") select name as _col_0, owner as _col_1 orderby '_col_1'", 1, "_col_1", isAscending}, //will it work
-                {"hive_db where (name = \"Reporting\") select name as _col_0, owner as _col_1  orderby '_col_1' limit 10", 1, "_col_1", isAscending},
-                {"hive_db where hive_db has name orderby 'hive_db.owner'", 3, "owner", isAscending},
-                {"hive_db where hive_db has name orderby 'hive_db.owner' limit 5", 3, "owner", isAscending},
-                {"hive_db where hive_db has name orderby 'hive_db.owner' limit 2 offset 0", 2, "owner", isAscending},
+                {"hive_db where (name = \"Reporting\") select name as _col_0, owner as _col_1  orderby owner limit 10", 1, "_col_1", isAscending},
+                {"hive_db where hive_db has name orderby owner", 3, "owner", isAscending},
+                {"hive_db where hive_db has name orderby owner limit 5", 3, "owner", isAscending},
+                {"hive_db where hive_db has name orderby owner limit 2 offset 0", 2, "owner", isAscending},
                 {"hive_db where hive_db has name orderby 'hive_db.owner' limit 2 offset 1", 2, "owner", isAscending},
                 
                 {"hive_db where (name = \"Reporting\") select name as _col_0, (createTime + 1) as _col_1 orderby '_col_1'", 1, "_col_1", isAscending},
@@ -604,15 +634,15 @@ public class GraphBackedDiscoveryServiceTest extends BaseRepositoryTest {
                 {"hive_db where (name = \"Reporting\") select name as _col_0, (createTime + 1) as _col_1 orderby '_col_1' limit 10 offset 1", 0, "_col_1", isAscending},
                 {"hive_db where (name = \"Reporting\") select name as _col_0, (createTime + 1) as _col_1 orderby '_col_1' limit 10 offset 0", 1, "_col_1", isAscending},
                 
-                {"hive_table where (name = \"sales_fact\" and createTime > \"2014-01-01\" ) select name as _col_0, createTime as _col_1 orderby '_col_1' ", 1, "_col_1", isAscending},
-                {"hive_table where (name = \"sales_fact\" and createTime > \"2014-01-01\" ) select name as _col_0, createTime as _col_1 orderby '_col_1' limit 10 ", 1, "_col_1", isAscending},
-                {"hive_table where (name = \"sales_fact\" and createTime > \"2014-01-01\" ) select name as _col_0, createTime as _col_1 orderby '_col_1' limit 10 offset 0", 1, "_col_1", isAscending},
+                {"hive_table where (name = \"sales_fact\" and createTime > \"2014-01-01\" ) select name as _col_0, createTime as _col_1 orderby createTime", 1, "_col_1", isAscending},
+                {"hive_table where (name = \"sales_fact\" and createTime > \"2014-01-01\" ) select name as _col_0, createTime as _col_1 orderby createTime limit 10 ", 1, "_col_1", isAscending},
+                {"hive_table where (name = \"sales_fact\" and createTime > \"2014-01-01\" ) select name as _col_0, createTime as _col_1 orderby createTime limit 10 offset 0", 1, "_col_1", isAscending},
                 {"hive_table where (name = \"sales_fact\" and createTime > \"2014-01-01\" ) select name as _col_0, createTime as _col_1 orderby '_col_1' limit 10 offset 5", 0, "_col_1", isAscending},
                 
-                {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 orderby '_col_0' ", 1, "_col_0", isAscending},
-                {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 orderby '_col_0' limit 10 offset 0", 1, "_col_0", isAscending},
-                {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 orderby '_col_0' limit 10 offset 1", 0, "_col_0", isAscending},
-                {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 orderby '_col_0' limit 10", 1, "_col_0", isAscending},
+                {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 orderby name", 1, "_col_0", isAscending},
+                {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 orderby name limit 10 offset 0", 1, "_col_0", isAscending},
+                {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 orderby name limit 10 offset 1", 0, "_col_0", isAscending},
+                {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 orderby name limit 10", 1, "_col_0", isAscending},
                 {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 orderby '_col_0' limit 0 offset 1", 0, "_col_0", isAscending},
 
                 
@@ -624,7 +654,7 @@ public class GraphBackedDiscoveryServiceTest extends BaseRepositoryTest {
     @Test(dataProvider = "dslOrderByQueriesProvider")
     public void  testSearchByDSLQueriesWithOrderBy(String dslQuery, Integer expectedNumRows, String orderBy, boolean ascending) throws Exception {
         System.out.println("Executing dslQuery = " + dslQuery);
-        String jsonResults = discoveryService.searchByDSL(dslQuery);
+        String jsonResults = searchByDSL(dslQuery);
         assertNotNull(jsonResults);
 
         JSONObject results = new JSONObject(jsonResults);
@@ -677,17 +707,23 @@ public class GraphBackedDiscoveryServiceTest extends BaseRepositoryTest {
     
     @Test(dataProvider = "dslQueriesProvider")
     public void  testSearchByDSLQueries(String dslQuery, Integer expectedNumRows) throws Exception {
-        runQuery(dslQuery, expectedNumRows);
+        runQuery(dslQuery, expectedNumRows, 40, 0);
     }
 
     @Test(dataProvider = "comparisonQueriesProvider")
     public void testDataTypeComparisonQueries(String dslQuery, Integer expectedNumRows) throws Exception {
-        runQuery(dslQuery, expectedNumRows);
+        runQuery(dslQuery, expectedNumRows, 40, 0);
+    }
+
+    @Test(dataProvider = "dslExplicitLimitQueriesProvider")
+    public void testSearchByDSLQueriesWithExplicitLimit(String dslQuery, Integer expectedNumRows, int limit, int offset)
+            throws Exception {
+        runQuery(dslQuery, expectedNumRows, limit, offset);
     }
 
-    public void runQuery(String dslQuery, Integer expectedNumRows) throws Exception {
+    public void runQuery(String dslQuery, Integer expectedNumRows, int limitParam, int offsetParam) throws Exception {
         System.out.println("Executing dslQuery = " + dslQuery);
-        String jsonResults = discoveryService.searchByDSL(dslQuery);
+        String jsonResults = discoveryService.searchByDSL(dslQuery, new QueryParams(limitParam, offsetParam));
         assertNotNull(jsonResults);
 
         JSONObject results = new JSONObject(jsonResults);
@@ -710,9 +746,9 @@ public class GraphBackedDiscoveryServiceTest extends BaseRepositoryTest {
 
     @Test(dataProvider = "dslLimitQueriesProvider")
     public void  testSearchByDSLQueriesWithLimit(String dslQuery, Integer expectedNumRows) throws Exception {
-        runQuery(dslQuery, expectedNumRows);
+        runQuery(dslQuery, expectedNumRows, 40, 0);
     }
-    
+
     @DataProvider(name = "invalidDslQueriesProvider")
     private Object[][] createInvalidDSLQueries() {
         return new String[][]{{"from Unknown"}, {"Unknown"}, {"Unknown is Blah"},};
@@ -721,7 +757,7 @@ public class GraphBackedDiscoveryServiceTest extends BaseRepositoryTest {
     @Test(dataProvider = "invalidDslQueriesProvider", expectedExceptions = DiscoveryException.class)
     public void testSearchByDSLInvalidQueries(String dslQuery) throws Exception {
         System.out.println("Executing dslQuery = " + dslQuery);
-        discoveryService.searchByDSL(dslQuery);
+        searchByDSL(dslQuery);
         Assert.fail();
     }
 
@@ -731,7 +767,7 @@ public class GraphBackedDiscoveryServiceTest extends BaseRepositoryTest {
         createInstances();
 
         String dslQuery = "from D where a = 1";
-        String jsonResults = discoveryService.searchByDSL(dslQuery);
+        String jsonResults = searchByDSL(dslQuery);
         assertNotNull(jsonResults);
 
         JSONObject results = new JSONObject(jsonResults);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
index 97dfbcd..0d02333 100755
--- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
@@ -30,6 +30,7 @@ import org.apache.atlas.RepositoryMetadataModule;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.TestUtils;
 import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
+import org.apache.atlas.query.QueryParams;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.RepositoryException;
 import org.apache.atlas.typesystem.IStruct;
@@ -91,6 +92,7 @@ public class GraphBackedMetadataRepositoryTest {
 
     private TypeSystem typeSystem;
     private String guid;
+    private QueryParams queryParams = new QueryParams(100, 0);
 
     @BeforeClass
     public void setUp() throws Exception {
@@ -424,7 +426,7 @@ public class GraphBackedMetadataRepositoryTest {
     public void testSearchByDSLQuery() throws Exception {
         String dslQuery = "hive_database as PII";
         System.out.println("Executing dslQuery = " + dslQuery);
-        String jsonResults = discoveryService.searchByDSL(dslQuery);
+        String jsonResults = discoveryService.searchByDSL(dslQuery, queryParams);
         Assert.assertNotNull(jsonResults);
 
         JSONObject results = new JSONObject(jsonResults);
@@ -457,7 +459,7 @@ public class GraphBackedMetadataRepositoryTest {
     public void testSearchByDSLWithInheritance() throws Exception {
         String dslQuery = "Person where name = 'Jane'";
         System.out.println("Executing dslQuery = " + dslQuery);
-        String jsonResults = discoveryService.searchByDSL(dslQuery);
+        String jsonResults = discoveryService.searchByDSL(dslQuery, queryParams);
         Assert.assertNotNull(jsonResults);
 
         JSONObject results = new JSONObject(jsonResults);
@@ -488,7 +490,7 @@ public class GraphBackedMetadataRepositoryTest {
         TestUtils.dumpGraph(graphProvider.get());
 
         System.out.println("Executing dslQuery = " + dslQuery);
-        String jsonResults = discoveryService.searchByDSL(dslQuery);
+        String jsonResults = discoveryService.searchByDSL(dslQuery, queryParams);
         Assert.assertNotNull(jsonResults);
 
         JSONObject results = new JSONObject(jsonResults);
@@ -522,7 +524,7 @@ public class GraphBackedMetadataRepositoryTest {
 
         //person in hr department whose name is john
         Thread.sleep(sleepInterval);
-        String response = discoveryService.searchByFullText("john");
+        String response = discoveryService.searchByFullText("john", queryParams);
         Assert.assertNotNull(response);
         JSONArray results = new JSONArray(response);
         Assert.assertEquals(results.length(), 1);
@@ -530,7 +532,7 @@ public class GraphBackedMetadataRepositoryTest {
         Assert.assertEquals(row.get("typeName"), "Person");
 
         //person in hr department who lives in santa clara
-        response = discoveryService.searchByFullText("Jane AND santa AND clara");
+        response = discoveryService.searchByFullText("Jane AND santa AND clara", queryParams);
         Assert.assertNotNull(response);
         results = new JSONArray(response);
         Assert.assertEquals(results.length(), 1);
@@ -538,7 +540,7 @@ public class GraphBackedMetadataRepositoryTest {
         Assert.assertEquals(row.get("typeName"), "Manager");
 
         //search for person in hr department whose name starts is john/jahn
-        response = discoveryService.searchByFullText("hr AND (john OR jahn)");
+        response = discoveryService.searchByFullText("hr AND (john OR jahn)", queryParams);
         Assert.assertNotNull(response);
         results = new JSONArray(response);
         Assert.assertEquals(results.length(), 1);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/085d5c86/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
index f2a5b50..52dcfde 100644
--- a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
@@ -31,6 +31,7 @@ import org.apache.atlas.RequestContext;
 import org.apache.atlas.TestUtils;
 import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
 import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.query.QueryParams;
 import org.apache.atlas.repository.audit.EntityAuditRepository;
 import org.apache.atlas.repository.audit.HBaseBasedAuditRepository;
 import org.apache.atlas.repository.audit.HBaseTestUtils;
@@ -230,8 +231,9 @@ public class DefaultMetadataServiceTest {
         assertReferenceableEquals(instance, entity);
 
         //Verify that search with reserved characters works - for string attribute
-        String responseJson = discoveryService.searchByDSL(
-                String.format("`%s` where `%s` = '%s'", typeDefinition.typeName, strAttrName, entity.get(strAttrName)));
+        String query =
+                String.format("`%s` where `%s` = '%s'", typeDefinition.typeName, strAttrName, entity.get(strAttrName));
+        String responseJson = discoveryService.searchByDSL(query, new QueryParams(1, 0));
         JSONObject response = new JSONObject(responseJson);
         assertEquals(response.getJSONArray("rows").length(), 1);
     }