You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/01/06 20:06:24 UTC

[drill] branch master updated: DRILL-8092: Add Auto Pagination to HTTP Storage Plugin (#2414)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 31b1274  DRILL-8092: Add Auto Pagination to HTTP Storage Plugin (#2414)
31b1274 is described below

commit 31b1274b6beddd191bee9c1d0ab2c827296b0353
Author: Charles S. Givre <cg...@apache.org>
AuthorDate: Thu Jan 6 15:06:14 2022 -0500

    DRILL-8092: Add Auto Pagination to HTTP Storage Plugin (#2414)
---
 .../org/apache/drill/exec/store/xml/XMLReader.java |   3 +-
 contrib/storage-http/Pagination.md                 |  42 ++
 contrib/storage-http/README.md                     |  19 +-
 .../drill/exec/store/http/HttpApiConfig.java       |   9 +-
 .../drill/exec/store/http/HttpBatchReader.java     |  57 ++-
 .../drill/exec/store/http/HttpCSVBatchReader.java  |  24 +-
 .../drill/exec/store/http/HttpGroupScan.java       |   8 +-
 .../drill/exec/store/http/HttpPaginatorConfig.java | 168 ++++++++
 .../exec/store/http/HttpPushDownListener.java      |  14 +
 .../exec/store/http/HttpScanBatchCreator.java      |  76 +++-
 .../drill/exec/store/http/HttpStoragePlugin.java   |   2 +-
 .../drill/exec/store/http/HttpXMLBatchReader.java  |  36 +-
 .../exec/store/http/paginator/OffsetPaginator.java | 117 +++++
 .../exec/store/http/paginator/PagePaginator.java   | 105 +++++
 .../drill/exec/store/http/paginator/Paginator.java | 131 ++++++
 .../drill/exec/store/http/util/SimpleHttp.java     |  13 +-
 .../drill/exec/store/http/TestHttpPlugin.java      |  74 +++-
 .../drill/exec/store/http/TestPagination.java      | 479 +++++++++++++++++++++
 .../drill/exec/store/http/TestPaginator.java       |  92 ++++
 .../storage-http/src/test/resources/data/p1.json   |  10 +
 .../storage-http/src/test/resources/data/p2.json   |  10 +
 .../storage-http/src/test/resources/data/p3.json   |   6 +
 .../src/test/resources/data/response_1.xml         |  45 ++
 .../src/test/resources/data/response_2.csv         |   3 +
 .../src/test/resources/data/response_2.xml         |  45 ++
 .../src/test/resources/data/response_3.csv         |   3 +
 .../src/test/resources/data/response_3.xml         |  37 ++
 .../src/test/resources/data/response_4.csv         |   2 +
 .../drill/exec/store/ImplicitColumnUtils.java      |   2 +-
 .../store/easy/json/loader/JsonLoaderImpl.java     |   3 +
 30 files changed, 1605 insertions(+), 30 deletions(-)

diff --git a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLReader.java b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLReader.java
index dd66e1c..6b26eab 100644
--- a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLReader.java
+++ b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLReader.java
@@ -41,13 +41,14 @@ import javax.xml.stream.XMLStreamException;
 import javax.xml.stream.events.Attribute;
 import javax.xml.stream.events.StartElement;
 import javax.xml.stream.events.XMLEvent;
+import java.io.Closeable;
 import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Stack;
 
-public class XMLReader {
+public class XMLReader implements Closeable {
   private static final Logger logger = LoggerFactory.getLogger(XMLReader.class);
   private static final String ATTRIBUTE_MAP_NAME = "attributes";
 
diff --git a/contrib/storage-http/Pagination.md b/contrib/storage-http/Pagination.md
new file mode 100644
index 0000000..d2206b2
--- /dev/null
+++ b/contrib/storage-http/Pagination.md
@@ -0,0 +1,42 @@
+# Auto Pagination in Drill
+Remote APIs frequently implement some sort of pagination as a way of limiting results.  However, if you are performing bulk data analysis, it is necessary to reassemble the 
+data into one larger dataset.  Drill's auto-pagination features allow this to happen in the background, so that the user will get clean data back.
+
+To use a paginator, you simply have to configure the paginator in the connection for the particular API.  
+
+## Words of Caution
+While extremely powerful, the auto-pagination feature has the potential to run afoul of APIs rate limits and even potentially DDOS an API. 
+
+
+## Offset Pagination
+Offset Pagination uses commands similar to SQL which has a `LIMIT` and an `OFFSET`.  With an offset paginator, let's say you want 200 records and the  page size is 50 records, the offset paginator will break up your query into 4 requests as shown below:
+
+* myapi.com?limit=50&offset=0
+* myapi.com?limit=50?offset=50
+* myapi.com?limit=50&offset=100
+* myapi.com?limit=50&offset=150
+
+### Configuring Offset Pagination
+To configure an offset paginator, simply add the following to the configuration for your connection. 
+
+```json
+"paginator": {
+   "limitParam": "<limit>",
+   "offsetParam": "<offset>",
+   "pageSize": 100,
+   "method": "OFFSET"
+}
+```
+
+## Page Pagination
+Page pagination is very similar to offset pagination except instead of using an `OFFSET` it uses a page number. 
+
+```json
+ "paginator": {
+        "pageParam": "page",
+        "pageSizeParam": "per_page",
+        "pageSize": 100,
+        "method": "PAGE"
+      }
+```
+In either case, the `pageSize` parameter should be set to the maximum page size allowable by the API.  This will minimize the number of requests Drill is making.
diff --git a/contrib/storage-http/README.md b/contrib/storage-http/README.md
index a287f9d..ab4058d 100644
--- a/contrib/storage-http/README.md
+++ b/contrib/storage-http/README.md
@@ -4,7 +4,7 @@ The HTTP storage plugin lets you query APIs over HTTP/REST. The plugin
 expects JSON responses.
 
 The HTTP plugin is new in Drill 1.18 and is an Alpha feature. It works well, and we
-enourage you to use it and provide feedback. However, we reserve the right to change
+encourage you to use it and provide feedback. However, we reserve the right to change
 the plugin based on that feedback.
 
 ## Configuration
@@ -186,7 +186,7 @@ If your query contains other conditions (`!=`, `<`, etc.) then those conditions
 in Drill after the REST service returns the data.
 
 Only fields listed in the `params` config filed will become parameters, all other
-experssions are handled within Drill as explained above.
+expressions are handled within Drill as explained above.
 
 At present, Drill requires the values to be literals (constants). Drill does not
 currently allow expressions. That is, the following will not become an HTTP parameter:
@@ -278,6 +278,19 @@ If the `authType` is set to `basic`, `username` and `password` must be set in th
 
 `password`: The password for basic authentication.
 
+#### Limiting Results
+Some APIs support a query parameter which is used to limit the number of results returned by the API.  In this case you can set the `limitQueryParam` config variable to the query parameter name and Drill will automatically include this in your query.  For instance, if you have an API which supports a limit query parameter called `maxRecords` and you set the abovementioned config variable then execute the following query:
+  
+```sql
+SELECT <fields>
+FROM api.limitedApi
+LIMIT 10 
+```  
+Drill will send the following request to your API:
+```
+https://<api>?maxRecords=10
+```
+
 #### errorOn400
 When a user makes HTTP calls, the response code will be from 100-599.  400 series error codes can contain useful information and in some cases you would not want Drill to throw 
 errors on 400 series errors.  This option allows you to define Drill's behavior on 400 series error codes.  When set to `true`, Drill will throw an exception and halt execution 
@@ -599,7 +612,7 @@ If using a "tail" in the query, verify that the tail is quoted using back-ticks
 as shown in the examples.
 
 Check that the URL is correct. If not, check the plugin configuration properties
-described above to find out why the pieces were assembed as you want.
+described above to find out why the pieces were assembled as you want.
 
 If the query works but delivers unexpected results, check the Drill log file.
 Drill logs a message like the following at the info level when opening the HTTP connection:
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
index cca2767..87298b6 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
@@ -58,7 +58,6 @@ public class HttpApiConfig {
 
   @JsonProperty
   private final String url;
-
   /**
    * Whether this API configuration represents a schema (with the
    * table providing additional parts of the URL), or if this
@@ -101,6 +100,8 @@ public class HttpApiConfig {
   @JsonProperty
   private final int xmlDataLevel;
   @JsonProperty
+  private final String limitQueryParam;
+  @JsonProperty
   private final boolean errorOn400;
 
   // Enables the user to configure JSON options at the connection level rather than globally.
@@ -112,6 +113,9 @@ public class HttpApiConfig {
   private final boolean verifySSLCert;
   @Getter(AccessLevel.NONE)
   private final CredentialsProvider credentialsProvider;
+  @JsonProperty
+  private final HttpPaginatorConfig paginator;
+
   @Getter(AccessLevel.NONE)
   protected boolean directCredentials;
 
@@ -172,6 +176,9 @@ public class HttpApiConfig {
     this.errorOn400 = builder.errorOn400;
     this.credentialsProvider = CredentialProviderUtils.getCredentialsProvider(builder.userName, builder.password, builder.credentialsProvider);
     this.directCredentials = builder.credentialsProvider == null;
+
+    this.limitQueryParam = builder.limitQueryParam;
+    this.paginator = builder.paginator;
   }
 
   @JsonProperty
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index 59d14af..05d8c49 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -33,11 +33,13 @@ import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.store.easy.json.loader.JsonLoader;
 import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
 import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
+import org.apache.drill.exec.store.http.paginator.Paginator;
 import org.apache.drill.exec.store.http.util.HttpProxyConfig;
 import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyBuilder;
 import org.apache.drill.exec.store.http.util.SimpleHttp;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.exec.store.ImplicitColumnUtils.ImplicitColumns;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,13 +56,27 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
 
   private final HttpSubScan subScan;
   private final int maxRecords;
+  protected final Paginator paginator;
+  protected String baseUrl;
   private JsonLoader jsonLoader;
-  private int recordCount;
+  private ResultSetLoader resultSetLoader;
+
   protected ImplicitColumns implicitColumns;
 
+
   public HttpBatchReader(HttpSubScan subScan) {
     this.subScan = subScan;
     this.maxRecords = subScan.maxRecords();
+    this.baseUrl = subScan.tableSpec().connectionConfig().url();
+    this.paginator = null;
+  }
+
+  public HttpBatchReader(HttpSubScan subScan, Paginator paginator) {
+    this.subScan = subScan;
+    this.maxRecords = subScan.maxRecords();
+    this.paginator = paginator;
+    this.baseUrl = paginator.next();
+    logger.debug("Batch reader with URL: {}", this.baseUrl);
   }
 
   @Override
@@ -72,6 +88,7 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
         .getString(ExecConstants.DRILL_TMP_DIR);
 
     HttpUrl url = buildUrl();
+    logger.debug("Final URL: {}", url);
 
     CustomErrorContext errorContext = new ChildErrorContext(negotiator.parentErrorContext()) {
       @Override
@@ -92,11 +109,10 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
       .build();
 
     // JSON loader setup
-    ResultSetLoader loader = negotiator.build();
-
 
+    resultSetLoader = negotiator.build();
     if (implicitColumnsAreProjected()) {
-      implicitColumns = new ImplicitColumns(loader.writer());
+      implicitColumns = new ImplicitColumns(resultSetLoader.writer());
       buildImplicitColumns();
     }
 
@@ -106,13 +122,19 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
     try {
       JsonLoaderBuilder jsonBuilder = new JsonLoaderBuilder()
           .implicitFields(implicitColumns)
-          .resultSetLoader(loader)
+          .resultSetLoader(resultSetLoader)
+          .standardOptions(negotiator.queryOptions())
+          .maxRows(maxRecords)
           .dataPath(subScan.tableSpec().connectionConfig().dataPath())
           .errorContext(errorContext)
           .fromStream(inStream);
 
       if (subScan.tableSpec().connectionConfig().jsonOptions() != null) {
-        JsonLoaderOptions jsonOptions = subScan.tableSpec().connectionConfig().jsonOptions().getJsonOptions(negotiator.queryOptions());
+        JsonLoaderOptions jsonOptions = subScan
+          .tableSpec()
+          .connectionConfig()
+          .jsonOptions()
+          .getJsonOptions(negotiator.queryOptions());
         jsonBuilder.options(jsonOptions);
       } else {
         jsonBuilder.standardOptions(negotiator.queryOptions());
@@ -126,7 +148,7 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
       throw t;
     }
 
-    return true; // Please read the first batch
+    return true;
   }
 
   protected void buildImplicitColumns() {
@@ -163,8 +185,8 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
   }
 
   protected HttpUrl buildUrl() {
+    logger.debug("Building URL from {}", baseUrl);
     HttpApiConfig apiConfig = subScan.tableSpec().connectionConfig();
-    String baseUrl = apiConfig.url();
 
     // Append table name, if available.
     if (subScan.tableSpec().tableName() != null) {
@@ -180,6 +202,12 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
         subScan.filters() != null) {
       addFilters(urlBuilder, apiConfig.params(), subScan.filters());
     }
+
+    // Add limit parameter if defined and limit set
+    if (! Strings.isNullOrEmpty(apiConfig.limitQueryParam()) && maxRecords > 0) {
+      urlBuilder.addQueryParameter(apiConfig.limitQueryParam(), String.valueOf(maxRecords));
+    }
+
     return urlBuilder.build();
   }
 
@@ -189,6 +217,7 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
    */
   protected void addFilters(Builder urlBuilder, List<String> params,
       Map<String, String> filters) {
+
     for (String param : params) {
       String value = filters.get(param);
       if (value != null) {
@@ -216,13 +245,15 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
 
   @Override
   public boolean next() {
-    recordCount++;
+    boolean result = jsonLoader.readBatch();
 
-    // Stop after the limit has been reached
-    if (maxRecords >= 1 && recordCount > maxRecords) {
-      return false;
+    // Allows limitless pagination.
+    if (paginator != null &&
+      maxRecords < 0 && (resultSetLoader.totalRowCount()) < paginator.getPageSize()) {
+      logger.debug("Partially filled page received, ending pagination");
+      paginator.endPagination();
     }
-    return jsonLoader.readBatch();
+    return result;
   }
 
   @Override
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
index ae78252..481525d 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
@@ -33,9 +33,12 @@ import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.ImplicitColumnUtils.ImplicitColumns;
+import org.apache.drill.exec.store.http.paginator.Paginator;
 import org.apache.drill.exec.store.http.util.SimpleHttp;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.InputStream;
@@ -52,7 +55,9 @@ public class HttpCSVBatchReader extends HttpBatchReader {
   private SchemaBuilder builder;
   private RowSetLoader rowWriter;
   private InputStream inStream;
+  private ResultSetLoader resultLoader;
 
+  private static final Logger logger = LoggerFactory.getLogger(HttpCSVBatchReader.class);
 
   public HttpCSVBatchReader(HttpSubScan subScan) {
     super(subScan);
@@ -63,6 +68,15 @@ public class HttpCSVBatchReader extends HttpBatchReader {
     csvSettings.setLineSeparatorDetectionEnabled(true);
   }
 
+  public HttpCSVBatchReader(HttpSubScan subScan, Paginator paginator) {
+    super(subScan, paginator);
+    this.subScan = subScan;
+    this.maxRecords = subScan.maxRecords();
+
+    this.csvSettings = new CsvParserSettings();
+    csvSettings.setLineSeparatorDetectionEnabled(true);
+  }
+
   @Override
   public boolean open(SchemaNegotiator negotiator) {
 
@@ -99,7 +113,7 @@ public class HttpCSVBatchReader extends HttpBatchReader {
     builder = new SchemaBuilder();
     TupleMetadata drillSchema = buildSchema();
     negotiator.tableSchema(drillSchema, true);
-    ResultSetLoader resultLoader = negotiator.build();
+    resultLoader = negotiator.build();
 
     // Add implicit columns
     if (implicitColumnsAreProjected()) {
@@ -121,7 +135,6 @@ public class HttpCSVBatchReader extends HttpBatchReader {
       if (rowWriter.limitReached(maxRecords)) {
         return false;
       }
-
       if (!processRow()) {
         return false;
       }
@@ -131,6 +144,7 @@ public class HttpCSVBatchReader extends HttpBatchReader {
 
   @Override
   public void close() {
+    logger.debug("Closing URL: {}", baseUrl);
     AutoCloseables.closeSilently(inStream);
   }
 
@@ -162,6 +176,12 @@ public class HttpCSVBatchReader extends HttpBatchReader {
   private boolean processRow() {
     String[] nextRow = csvReader.parseNext();
     if (nextRow == null) {
+
+      if (paginator != null &&
+        maxRecords < 0 && (resultLoader.totalRowCount()) < paginator.getPageSize()) {
+        logger.debug("Ending CSV pagination: Pages too small");
+        paginator.endPagination();
+      }
       return false;
     }
     rowWriter.start();
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
index d91d618..619858d 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
@@ -228,8 +228,14 @@ public class HttpGroupScan extends AbstractGroupScan {
           1E9, 1E112, 1E12);
     }
 
-    // No good estimates at all, just make up something.
+    // No good estimates at all, just make up something.  Make it smaller if there is a limit.
     double estRowCount = 10_000;
+    // If the limit is greater than 10_000 then use a smaller number so the limit is pushed down.
+    if (maxRecords >= -1) {
+      estRowCount = Math.min(maxRecords, 10_000);
+      estRowCount = estRowCount / 2;
+    }
+
 
     // NOTE this was important! if the predicates don't make the query more
     // efficient they won't get pushed down
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java
new file mode 100644
index 0000000..cbacff8
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.UserException;
+
+
+@Slf4j
+@Builder
+@Getter
+@Accessors(fluent = true)
+@EqualsAndHashCode
+@ToString
+@AllArgsConstructor
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+@JsonDeserialize(builder = HttpPaginatorConfig.HttpPaginatorBuilder.class)
+public class HttpPaginatorConfig {
+
+  // For Offset Pagination
+  @JsonProperty
+  private final String limitParam;
+
+  @JsonProperty
+  private final String offsetParam;
+
+  // For Page Pagination
+  @JsonProperty
+  private final String pageParam;
+
+  @JsonProperty
+  private final String pageSizeParam;
+
+  @JsonProperty
+  private final int pageSize;
+
+  @JsonProperty
+  private final int maxRecords;
+
+  @JsonProperty
+  private final String method;
+
+  public enum PaginatorMethod {
+    OFFSET,
+    PAGE
+  }
+
+  private HttpPaginatorConfig(HttpPaginatorConfig.HttpPaginatorBuilder builder) {
+    this.limitParam = builder.limitParam;
+    this.offsetParam = builder.offsetParam;
+    this.pageSize = builder.pageSize;
+    this.pageParam = builder.pageParam;
+    this.pageSizeParam = builder.pageSizeParam;
+    this.maxRecords = builder.maxRecords;
+
+    this.method = StringUtils.isEmpty(builder.method)
+      ? PaginatorMethod.OFFSET.toString() : builder.method.trim().toUpperCase();
+
+    PaginatorMethod paginatorMethod = PaginatorMethod.valueOf(this.method);
+
+    /*
+    * For pagination to function key fields must be defined.  This block validates the required fields for
+    * each type of paginator.
+     */
+    switch (paginatorMethod) {
+      case OFFSET:
+        if (StringUtils.isEmpty(this.limitParam) || StringUtils.isEmpty(this.offsetParam)) {
+          throw UserException
+            .validationError()
+            .message("Invalid paginator configuration.  For OFFSET pagination, limitField and offsetField must be defined.")
+            .build(logger);
+        } else if (this.pageSize <= 0) {
+          throw UserException
+            .validationError()
+            .message("Invalid paginator configuration.  For OFFSET pagination, maxPageSize must be defined and greater than zero.")
+            .build(logger);
+        }
+        break;
+      case PAGE:
+        if (StringUtils.isEmpty(this.pageParam) || StringUtils.isEmpty(this.pageSizeParam)) {
+          throw UserException
+            .validationError()
+            .message("Invalid paginator configuration.  For PAGE pagination, pageField and pageSizeField must be defined.")
+            .build(logger);
+        } else if (this.pageSize <= 0) {
+          throw UserException
+            .validationError()
+            .message("Invalid paginator configuration.  For PAGE pagination, maxPageSize must be defined and greater than zero.")
+            .build(logger);
+        }
+        break;
+      default:
+        throw UserException
+          .validationError()
+          .message("Invalid paginator method: %s.  Drill supports 'OFFSET' and 'PAGE'", method)
+          .build(logger);
+    }
+  }
+
+  @JsonIgnore
+  public PaginatorMethod getMethodType() {
+    return PaginatorMethod.valueOf(this.method.toUpperCase());
+  }
+
+  @JsonPOJOBuilder(withPrefix = "")
+  public static class HttpPaginatorBuilder {
+    @Getter
+    @Setter
+    public String limitParam;
+
+    @Getter
+    @Setter
+    public String offsetParam;
+
+    @Getter
+    @Setter
+    public int maxRecords;
+
+    @Getter
+    @Setter
+    public int pageSize;
+
+    @Getter
+    @Setter
+    public String pageParam;
+
+    @Getter
+    @Setter
+    public String pageSizeParam;
+
+    @Getter
+    @Setter
+    public String method;
+
+    public HttpPaginatorConfig build() {
+      return new HttpPaginatorConfig(this);
+    }
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPushDownListener.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPushDownListener.java
index 790c42b..c4f5596 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPushDownListener.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPushDownListener.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.store.base.filter.ExprNode.ColRelOpConstNode;
 import org.apache.drill.exec.store.base.filter.ExprNode.OrNode;
 import org.apache.drill.exec.store.base.filter.FilterPushDownListener;
 import org.apache.drill.exec.store.base.filter.FilterPushDownStrategy;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
 import org.apache.drill.exec.store.http.util.SimpleHttp;
 
 import java.util.Collections;
@@ -87,6 +88,7 @@ public class HttpPushDownListener implements FilterPushDownListener {
 
     HttpScanPushDownListener(HttpGroupScan groupScan) {
       this.groupScan = groupScan;
+
       // Add fields from config
       if (groupScan.getHttpConfig().params() != null) {
         for (String field : groupScan.getHttpConfig().params()) {
@@ -94,6 +96,18 @@ public class HttpPushDownListener implements FilterPushDownListener {
         }
       }
 
+      // Add fields from paginator, if present
+      HttpPaginatorConfig paginator = groupScan.getHttpConfig().paginator();
+      if (paginator != null) {
+        if (paginator.getMethodType() == PaginatorMethod.OFFSET) {
+          filterParams.put(paginator.limitParam(), paginator.limitParam());
+          filterParams.put(paginator.offsetParam(), paginator.offsetParam());
+        } else if (paginator.getMethodType() == PaginatorMethod.PAGE) {
+          filterParams.put(paginator.pageParam(), paginator.pageParam());
+          filterParams.put(paginator.pageSizeParam(), paginator.pageSizeParam());
+        }
+      }
+
       // Add fields from the URL path as denoted by {}
       HttpUrl url = HttpUrl.parse(groupScan.getHttpConfig().url());
       if (url != null) {
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
index 9ef55fc..e0e139f 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.http;
 
+import okhttp3.HttpUrl;
 import org.apache.drill.common.exceptions.ChildErrorContext;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
@@ -32,12 +33,20 @@ import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
+import org.apache.drill.exec.store.http.paginator.OffsetPaginator;
+import org.apache.drill.exec.store.http.paginator.PagePaginator;
+import org.apache.drill.exec.store.http.paginator.Paginator;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
 public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> {
 
+  private static final Logger logger = LoggerFactory.getLogger(HttpScanBatchCreator.class);
+
   @Override
   public CloseableRecordBatch getBatch(ExecutorFragmentContext context, HttpSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
@@ -74,15 +83,60 @@ public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> {
     ReaderFactory readerFactory = new HttpReaderFactory(subScan);
     builder.setReaderFactory(readerFactory);
     builder.nullType(Types.optional(MinorType.VARCHAR));
+
+    // TODO Add page size limit here to ScanFramework Builder
+
     return builder;
   }
 
   private static class HttpReaderFactory implements ReaderFactory {
     private final HttpSubScan subScan;
+    private final HttpPaginatorConfig paginatorConfig;
+    private Paginator paginator;
+
     private int count;
 
     public HttpReaderFactory(HttpSubScan subScan) {
       this.subScan = subScan;
+
+      paginatorConfig = subScan.tableSpec().connectionConfig().paginator();
+      if (paginatorConfig != null) {
+        // TODO Handle the case of no limit queries in pagination
+        logger.debug("Creating paginator using config: {}", paginatorConfig);
+
+        // Initialize the paginator and generate the base URLs
+        this.paginator = getPaginator();
+      }
+    }
+
+    private Paginator getPaginator() {
+      HttpUrl.Builder urlBuilder;
+      HttpUrl rawUrl = HttpUrl.parse(subScan.tableSpec().connectionConfig().url());
+
+      // If the URL is not parsable or otherwise invalid
+      if (rawUrl == null) {
+        throw UserException.validationError()
+          .message("Invalid URL: " + subScan.tableSpec().connectionConfig().url())
+          .build(logger);
+      }
+
+      urlBuilder = rawUrl.newBuilder();
+
+      Paginator paginator = null;
+      if (paginatorConfig.getMethodType() == PaginatorMethod.OFFSET) {
+        paginator = new OffsetPaginator(urlBuilder,
+          subScan.maxRecords(),
+          paginatorConfig.pageSize(),
+          paginatorConfig.limitParam(),
+          paginatorConfig.offsetParam());
+      } else if (paginatorConfig.getMethodType() == PaginatorMethod.PAGE) {
+        paginator = new PagePaginator(urlBuilder,
+          subScan.maxRecords(),
+          paginatorConfig.pageSize(),
+          paginatorConfig.pageParam(),
+          paginatorConfig.pageSizeParam());
+      }
+      return paginator;
     }
 
     @Override
@@ -90,12 +144,14 @@ public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> {
 
     @Override
     public ManagedReader<SchemaNegotiator> next() {
+      logger.debug("Getting new batch reader.");
 
       // Get the expected input type
       String inputType = subScan.tableSpec().connectionConfig().inputType();
 
       // Only a single scan (in a single thread)
-      if (count++ == 0) {
+      if (count++ == 0 && paginatorConfig == null) {
+        // Case for no pagination
         if (inputType.equalsIgnoreCase(HttpApiConfig.CSV_INPUT_FORMAT)) {
           return new HttpCSVBatchReader(subScan);
         } else if (inputType.equalsIgnoreCase(HttpApiConfig.XML_INPUT_FORMAT)) {
@@ -103,7 +159,25 @@ public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> {
         } else {
           return new HttpBatchReader(subScan);
         }
+      } else if (paginatorConfig != null) {
+        /*
+        * If the paginator is not null and generated a list of URLs, we create
+        * a new batch reader for each URL.  In the future, this could be parallelized in
+        * the group scan such that the calls could be sent to different drillbits.
+        */
+        if (!paginator.hasMore()) {
+          return null;
+        }
+
+        if (inputType.equalsIgnoreCase(HttpApiConfig.CSV_INPUT_FORMAT)) {
+          return new HttpCSVBatchReader(subScan, paginator);
+        } else if (inputType.equalsIgnoreCase(HttpApiConfig.XML_INPUT_FORMAT)) {
+          return new HttpXMLBatchReader(subScan, paginator);
+        } else {
+          return new HttpBatchReader(subScan, paginator);
+        }
       }
+      logger.debug("No new batch reader.");
       return null;
     }
   }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
index e5bcc7b..ab55f21 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
@@ -74,7 +74,7 @@ public class HttpStoragePlugin extends AbstractStoragePlugin {
     // existing plugins perform filter push-down at the physical
     // phase, which also works fine if push-down is independent of
     // parallelization.
-    if (FilterPushDownUtils.isFilterPushDownPhase(phase)) {
+    if (FilterPushDownUtils.isFilterPushDownPhase(phase) || phase == PlannerPhase.LOGICAL) {
       return HttpPushDownListener.rulesFor(optimizerContext);
     } else {
       return ImmutableSet.of();
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
index e62f997..afc3dcc 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.store.ImplicitColumnUtils.ImplicitColumns;
+import org.apache.drill.exec.store.http.paginator.Paginator;
 import org.apache.drill.exec.store.http.util.SimpleHttp;
 import org.apache.drill.exec.store.xml.XMLReader;
 
@@ -44,6 +45,7 @@ public class HttpXMLBatchReader extends HttpBatchReader {
   private final int dataLevel;
   private InputStream inStream;
   private XMLReader xmlReader;
+  private ResultSetLoader resultLoader;
 
   public HttpXMLBatchReader(HttpSubScan subScan) {
     super(subScan);
@@ -52,6 +54,14 @@ public class HttpXMLBatchReader extends HttpBatchReader {
     this.dataLevel = subScan.tableSpec().connectionConfig().xmlDataLevel();
   }
 
+
+  public HttpXMLBatchReader(HttpSubScan subScan, Paginator paginator) {
+    super(subScan, paginator);
+    this.subScan = subScan;
+    this.maxRecords = subScan.maxRecords();
+    this.dataLevel = subScan.tableSpec().connectionConfig().xmlDataLevel();
+  }
+
   @Override
   public boolean open(SchemaNegotiator negotiator) {
 
@@ -75,6 +85,7 @@ public class HttpXMLBatchReader extends HttpBatchReader {
       .scanDefn(subScan)
       .url(url)
       .tempDir(new File(tempDirPath))
+      .paginator(paginator)
       .proxyConfig(proxySettings(negotiator.drillConfig(), url))
       .errorContext(errorContext)
       .build();
@@ -84,7 +95,7 @@ public class HttpXMLBatchReader extends HttpBatchReader {
     // Initialize the XMLReader the reader
     try {
       xmlReader = new XMLReader(inStream, dataLevel, maxRecords);
-      ResultSetLoader resultLoader = negotiator.build();
+      resultLoader = negotiator.build();
 
       if (implicitColumnsAreProjected()) {
         implicitColumns = new ImplicitColumns(resultLoader.writer());
@@ -107,12 +118,31 @@ public class HttpXMLBatchReader extends HttpBatchReader {
 
   @Override
   public boolean next() {
-    return xmlReader.next();
+    boolean result;
+    try {
+      result = xmlReader.next();
+    } catch (UserException e) {
+      // This covers the case of an empty XML response.  We don't want to throw an
+      // exception, just catch anything and halt execution.  Otherwise, throw the original exception.
+      if (e.getMessage().contains("EOF")) {
+        return false;
+      } else {
+        throw e;
+      }
+    }
+
+    if (paginator != null &&
+      maxRecords < 0 && (resultLoader.totalRowCount()) < paginator.getPageSize()) {
+      logger.debug("Ending XML pagination: Pages too small");
+      paginator.endPagination();
+    }
+
+    return result;
   }
 
   @Override
   public void close() {
     AutoCloseables.closeSilently(inStream);
-    xmlReader.close();
+    AutoCloseables.closeSilently(xmlReader);
   }
 }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/OffsetPaginator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/OffsetPaginator.java
new file mode 100644
index 0000000..1e2663f
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/OffsetPaginator.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http.paginator;
+
+import okhttp3.HttpUrl;
+import okhttp3.HttpUrl.Builder;
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class OffsetPaginator extends Paginator {
+
+  private static final Logger logger = LoggerFactory.getLogger(OffsetPaginator.class);
+
+  private final int limit;
+  private final String limitParam;
+  private final String offsetParam;
+  private int offset;
+
+  /**
+   * This class implements the idea of an Offset Paginator. See here for complete explanation:
+   * https://nordicapis.com/everything-you-need-to-know-about-api-pagination/
+   * <p>
+   *
+   * @param builder     The okhttp3 URL builder which has the API root URL
+   * @param limit       The limit clause from the sql query
+   * @param pageSize    The page size from the API documentation. To minimize requests, it should be set to the max that the API allows.
+   * @param limitParam  The field name which corresponds to the limit field from the API
+   * @param offsetParam The field name which corresponds to the offset field from the API
+   */
+  public OffsetPaginator(Builder builder, int limit, int pageSize, String limitParam, String offsetParam) {
+    super(builder, paginationMode.OFFSET, pageSize, limit > 0);
+    this.limit = limit;
+    this.limitParam = limitParam;
+    this.offsetParam = offsetParam;
+    this.paginatedUrls = buildPaginatedURLs();
+    this.offset = 0;
+
+    // Page size must be greater than zero
+    if (pageSize <= 0) {
+      throw UserException
+              .validationError()
+              .message("API page size must be a positive integer.")
+              .build(logger);
+    }
+  }
+
+  public int getLimit() {
+    return limit;
+  }
+
+  @Override
+  public String next() {
+    if (hasLimit) {
+      return super.next();
+    } else {
+      return generateNextUrl();
+    }
+  }
+
+  @Override
+  public String generateNextUrl() {
+    builder.removeAllEncodedQueryParameters(offsetParam);
+    builder.removeAllEncodedQueryParameters(limitParam);
+
+    builder.addQueryParameter(offsetParam, String.valueOf(offset));
+    builder.addQueryParameter(limitParam, String.valueOf(pageSize));
+    offset += pageSize;
+
+    return builder.build().url().toString();
+  }
+
+
+  /**
+   * Build the paginated URLs.  If the parameters are invalid, return a list with the original URL.
+   *
+   * @return List of paginated URLs
+   */
+  @Override
+  public List<HttpUrl> buildPaginatedURLs() {
+    paginatedUrls = new ArrayList<>();
+    // If user wants 1000 records, and the page size is 100, we need to send 10 requests
+    int requestedPages = limit / pageSize;
+
+    for (int i = 0; i < requestedPages; i++) {
+      // Clear out old params
+      builder.removeAllEncodedQueryParameters(offsetParam);
+      builder.removeAllEncodedQueryParameters(limitParam);
+
+      builder.addQueryParameter(offsetParam, String.valueOf(offset));
+      builder.addQueryParameter(limitParam, String.valueOf(pageSize));
+      offset += pageSize;
+      paginatedUrls.add(builder.build());
+    }
+
+    return paginatedUrls;
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java
new file mode 100644
index 0000000..f42a959
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http.paginator;
+
+import okhttp3.HttpUrl;
+import okhttp3.HttpUrl.Builder;
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PagePaginator extends Paginator {
+
+  private static final Logger logger = LoggerFactory.getLogger(OffsetPaginator.class);
+
+  private final int limit;
+  private final String pageParam;
+  private final String pageSizeParam;
+  private int currentPage;
+
+
+  /**
+   * The Page Paginator works similarly to the offset paginator.  It requires the user to supply a page number query
+   * parameter, a page size variable, and a maximum page size.
+   * @param builder The okHttp3 Request builder
+   * @param limit The limit, passed down from the batch reader of the maximum number of results the user wants
+   * @param pageSize The number of results per page.
+   * @param pageParam The API Query parameter which indicates which page number the user is requesting
+   * @param pageSizeParam The API Query parameter which specifies how many results per page
+   */
+  public PagePaginator(Builder builder, int limit, int pageSize, String pageParam, String pageSizeParam) {
+    super(builder, paginationMode.PAGE, pageSize, limit > 0);
+    this.limit = limit;
+    this.pageParam = pageParam;
+    this.pageSizeParam = pageSizeParam;
+    this.paginatedUrls = buildPaginatedURLs();
+    currentPage = 1;
+
+    // Page size must be greater than zero
+    if (pageSize <= 0) {
+      throw UserException
+        .validationError()
+        .message("API limit cannot be zero")
+        .build(logger);
+    }
+  }
+
+  @Override
+  public String next() {
+    if (hasLimit) {
+      return super.next();
+    } else {
+      return generateNextUrl();
+    }
+  }
+
+  @Override
+  public String generateNextUrl() {
+    builder.removeAllEncodedQueryParameters(pageParam);
+    builder.removeAllEncodedQueryParameters(pageSizeParam);
+
+    builder.addQueryParameter(pageParam, String.valueOf(currentPage));
+    builder.addQueryParameter(pageSizeParam, String.valueOf(pageSize));
+    currentPage++;
+
+    return builder.build().url().toString();
+  }
+
+
+  @Override
+  public List<HttpUrl> buildPaginatedURLs() {
+    this.paginatedUrls = new ArrayList<>();
+    int pageCount = Math.max(1, (limit / pageSize));
+
+    for (int i = 1; i <= pageCount; i++) {
+      // Clear out old params
+      builder.removeAllEncodedQueryParameters(pageParam);
+      builder.removeAllEncodedQueryParameters(pageSizeParam);
+
+      builder.addQueryParameter(pageParam, String.valueOf(i));
+      builder.addQueryParameter(pageSizeParam, String.valueOf(pageSize));
+      paginatedUrls.add(builder.build());
+    }
+
+    return paginatedUrls;
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/Paginator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/Paginator.java
new file mode 100644
index 0000000..1106c8c
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/Paginator.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http.paginator;
+
+import okhttp3.HttpUrl;
+import okhttp3.HttpUrl.Builder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * This class is the abstraction for the Paginator class.  There are
+ * different pagination methods, however, all essentially require the query
+ * engine to generate URLs to retrieve the next batch of data and also
+ * to determine whether the URL has more data.
+ *
+ * The Offset and Page paginators work either with a limit or without, but function
+ * slightly differently.  If a limit is specified and pushed down, the paginator will
+ * generate a list of URLs with the appropriate pagination parameters.  In the future
+ * this could be parallelized, however in the V1 all these requests are run in series.
+ */
+public abstract class Paginator {
+
+  private static final Logger logger = LoggerFactory.getLogger(Paginator.class);
+  private static final int MAX_ATTEMPTS = 100;
+  protected final int pageSize;
+  private boolean hasMore;
+
+  public enum paginationMode {
+    OFFSET,
+    PAGE
+  }
+
+  protected final boolean hasLimit;
+  protected final paginationMode MODE;
+  protected int index;
+  protected Builder builder;
+  protected List<HttpUrl> paginatedUrls;
+
+
+  public Paginator(Builder builder, paginationMode mode, int pageSize, boolean hasLimit) {
+    this.MODE = mode;
+    this.builder = builder;
+    this.pageSize = pageSize;
+    this.hasLimit = hasLimit;
+    hasMore = true;
+    index = 0;
+  }
+
+  public void setBuilder(Builder builder) {
+    this.builder = builder;
+  }
+
+  public abstract List<HttpUrl> buildPaginatedURLs();
+
+  /**
+   * This method is to be used when the user does not include a limit in the query
+   * In each paginator, the paginator tracks whether there is more data.  If there is
+   * more data, the paginator marks the hasMore variable false.
+   * @return
+   */
+  public abstract String generateNextUrl();
+
+  public List<HttpUrl> getPaginatedUrls() {
+    return this.paginatedUrls;
+  }
+
+  /**
+   * This method is used in pagination queries when no limit is present.  The intended
+   * flow is that if no limit is present, if the batch reader encounters a page which has
+   * less data than the page size, the batch reader should call this method to stop
+   * the Paginator from generating additional URLs to call.
+   *
+   * In the event that the API simply runs out of data, the reader will return false anyway
+   * and the pagination will stop.
+   */
+  public void endPagination() {
+    hasMore = false;
+  }
+
+  public int getPageSize() { return pageSize; }
+
+  public String next() {
+    if (!hasMore()) {
+      return null;
+    }
+    String url = paginatedUrls.get(index).toString();
+    index++;
+    if (index >= paginatedUrls.size()) {
+      hasMore = false;
+    }
+    return url;
+  }
+
+  /**
+   * Returns true if the paginator has more pages, false if not.
+   * @return True if there are more pages to visit, false if not.
+   */
+  public boolean hasMore() {
+    return hasMore;
+  }
+
+  /**
+   * Returns the count of URLs generated.  Only meaningful for OFFSET paginator.
+   * @return The count of pages.
+   */
+  public int count() {
+    if (paginatedUrls == null) {
+      return 0;
+    } else {
+      return paginatedUrls.size();
+    }
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 7b0b3d6..af40e7b 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.store.http.HttpApiConfig;
 import org.apache.drill.exec.store.http.HttpApiConfig.HttpMethod;
 import org.apache.drill.exec.store.http.HttpStoragePluginConfig;
 import org.apache.drill.exec.store.http.HttpSubScan;
+import org.apache.drill.exec.store.http.paginator.Paginator;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.jetbrains.annotations.NotNull;
 
@@ -77,21 +78,24 @@ public class SimpleHttp {
   private final File tempDir;
   private final HttpProxyConfig proxyConfig;
   private final CustomErrorContext errorContext;
+  private final Paginator paginator;
   private final HttpUrl url;
   private String responseMessage;
   private int responseCode;
   private String responseProtocol;
   private String responseURL;
 
+
   @lombok.Builder
   public SimpleHttp(HttpSubScan scanDefn, HttpUrl url, File tempDir,
-    HttpProxyConfig proxyConfig, CustomErrorContext errorContext) {
+    HttpProxyConfig proxyConfig, CustomErrorContext errorContext, Paginator paginator) {
     this.scanDefn = scanDefn;
     this.url = url;
     this.tempDir = tempDir;
     this.proxyConfig = proxyConfig;
     this.errorContext = errorContext;
     this.client = setupHttpClient();
+    this.paginator = paginator;
   }
 
   /**
@@ -236,6 +240,13 @@ public class SimpleHttp {
       responseProtocol = response.protocol().toString();
       responseURL = response.request().url().toString();
 
+      // Case for pagination without limit
+      if (paginator != null && (
+        response.code() != 200 || response.body() == null ||
+        response.body().contentLength() == 0)) {
+        paginator.endPagination();
+      }
+
       // If the request is unsuccessful, throw a UserException
       if (!isSuccessful(responseCode)) {
         throw UserException
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index ae4ac13..64305f2 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -22,6 +22,7 @@ import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
 import okhttp3.mockwebserver.RecordedRequest;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -30,7 +31,6 @@ import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
@@ -183,6 +183,22 @@ public class TestHttpPlugin extends ClusterTest {
       .postBody("key1=value1\nkey2=value2")
       .build();
 
+    HttpPaginatorConfig offsetPaginatorForJson = HttpPaginatorConfig.builder()
+      .limitParam("limit")
+      .offsetParam("offset")
+      .method("offset")
+      .pageSize(2)
+      .build();
+
+    HttpApiConfig mockJsonConfigWithPaginator = HttpApiConfig.builder()
+      .url("http://localhost:8091/json")
+      .method("get")
+      .headers(headers)
+      .requireTail(false)
+      .paginator(offsetPaginatorForJson)
+      .inputType("json")
+      .build();
+
     HttpApiConfig mockPostConfigWithoutPostBody = HttpApiConfig.builder()
       .url("http://localhost:8091/")
       .method("POST")
@@ -200,6 +216,15 @@ public class TestHttpPlugin extends ClusterTest {
       .inputType("csv")
       .build();
 
+    HttpApiConfig mockCsvConfigWithPaginator = HttpApiConfig.builder()
+      .url("http://localhost:8091/csv")
+      .method("get")
+      .paginator(offsetPaginatorForJson)
+      .inputType("csv")
+      .requireTail(false)
+      .dataPath("results")
+      .build();
+
     HttpApiConfig mockXmlConfig = HttpApiConfig.builder()
       .url("http://localhost:8091/xml")
       .method("GET")
@@ -251,6 +276,8 @@ public class TestHttpPlugin extends ClusterTest {
       .build();
 
     Map<String, HttpApiConfig> configs = new HashMap<>();
+    configs.put("csv_paginator", mockCsvConfigWithPaginator);
+    configs.put("json_paginator", mockJsonConfigWithPaginator);
     configs.put("sunrise", mockSchema);
     configs.put("mocktable", mockTable);
     configs.put("mockpost", mockPostConfig);
@@ -640,7 +667,23 @@ public class TestHttpPlugin extends ClusterTest {
   }
 
   @Test
-   public void testSerDe() throws Exception {
+   public void testSerDeCSV() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+        new MockResponse().setResponseCode(200)
+          .setBody(TEST_CSV_RESPONSE)
+      );
+
+      String sql = "SELECT COUNT(*) FROM local.mockcsv.`csv?arg1=4` ";
+      String plan = queryBuilder().sql(sql).explainJson();
+      long cnt = queryBuilder().physical(plan).singletonLong();
+      assertEquals("Counts should match", 2L, cnt);
+    }
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
     try (MockWebServer server = startServer()) {
 
       server.enqueue(
@@ -959,6 +1002,17 @@ public class TestHttpPlugin extends ClusterTest {
   }
 
   @Test
+  public void testLimitPushdownWithFilter() throws Exception {
+    String sql = "SELECT sunrise, sunset FROM live.sunrise2 WHERE `date`='2019-10-02' LIMIT 5";
+
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("Limit", "maxRecords=5", "filters=\\{date=2019-10-02\\}")
+      .match();
+  }
+
+  @Test
   public void testSlowResponse() throws Exception {
     try (MockWebServer server = startServer()) {
 
@@ -995,6 +1049,22 @@ public class TestHttpPlugin extends ClusterTest {
     }
   }
 
+  @Test
+  public void testZeroByteResponseFromCSV() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+        new MockResponse().setResponseCode(200)
+          .setBody("")
+      );
+
+      String sql = "SELECT * FROM local.mockcsv.`csv?arg1=4`";
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+      assertNull(results);
+    }
+  }
+
   // The connection expects a response object of the form
   // { results: { ... } }, but there is no such object, which
   // is treated as a null (no data, no schema) result set.
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
new file mode 100644
index 0000000..2ee6ba5
--- /dev/null
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http;
+
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.util.DrillFileUtils;
+import org.apache.drill.exec.physical.rowSet.DirectRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryRowSetIterator;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestPagination extends ClusterTest {
+  private static final int MOCK_SERVER_PORT = 8092;
+  private static String TEST_CSV_RESPONSE;
+  private static String TEST_CSV_RESPONSE_2;
+  private static String TEST_CSV_RESPONSE_3;
+  private static String TEST_CSV_RESPONSE_4;
+  private static String TEST_JSON_PAGE1;
+  private static String TEST_JSON_PAGE2;
+  private static String TEST_JSON_PAGE3;
+  private static String TEST_XML_PAGE1;
+  private static String TEST_XML_PAGE2;
+  private static String TEST_XML_PAGE3;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    TEST_CSV_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response.csv"), Charsets.UTF_8).read();
+    TEST_CSV_RESPONSE_2 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response_2.csv"), Charsets.UTF_8).read();
+    TEST_CSV_RESPONSE_3 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response_3.csv"), Charsets.UTF_8).read();
+    TEST_CSV_RESPONSE_4 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response_4.csv"), Charsets.UTF_8).read();
+
+    TEST_JSON_PAGE1 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/p1.json"), Charsets.UTF_8).read();
+    TEST_JSON_PAGE2 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/p2.json"), Charsets.UTF_8).read();
+    TEST_JSON_PAGE3 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/p3.json"), Charsets.UTF_8).read();
+
+    TEST_XML_PAGE1 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response_1.xml"), Charsets.UTF_8).read();
+    TEST_XML_PAGE2 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response_2.xml"), Charsets.UTF_8).read();
+    TEST_XML_PAGE3 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response_3.xml"), Charsets.UTF_8).read();
+
+    dirTestWatcher.copyResourceToRoot(Paths.get("data/"));
+    makeMockConfig(cluster);
+    makeLiveConfig(cluster);
+  }
+
+  /**
+   * Create configs against live external servers. Must be tested manually, and
+   * subject to the whims of the external site. Timeout is 10 seconds to allow
+   * for real-world delays.
+   */
+  public static void makeLiveConfig(ClusterFixture cluster) {
+
+    Map<String, String> uaHeaders = new HashMap<>();
+    uaHeaders.put("User-Agent",  "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36");
+
+    HttpPaginatorConfig githubPagePaginator = HttpPaginatorConfig.builder()
+      .pageParam("page")
+      .pageSizeParam("per_page")
+      .pageSize(5)
+      .method("PAGE")
+      .build();
+
+    HttpApiConfig githubConfig = HttpApiConfig.builder()
+      .url("https://api.github.com/orgs/{org}/repos")
+      .method("get")
+      .requireTail(false)
+      .headers(uaHeaders)
+      .inputType("json")
+      .paginator(githubPagePaginator)
+      .build();
+
+    Map<String, HttpApiConfig> configs = new HashMap<>();
+    configs.put("github", githubConfig);
+
+    HttpStoragePluginConfig mockStorageConfigWithWorkspace =
+      new HttpStoragePluginConfig(false, configs, 10, "", 80, "", "", "", PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+    mockStorageConfigWithWorkspace.setEnabled(true);
+    cluster.defineStoragePlugin("live", mockStorageConfigWithWorkspace);
+  }
+  /**
+   * Create configs for an in-process mock server. Used for normal automated unit
+   * testing. Timeout is short to allow for timeout testing. The mock server is
+   * useful, but won't catch bugs related to real-world server glitches.
+   */
+  public static void makeMockConfig(ClusterFixture cluster) {
+
+    Map<String, String> headers = new HashMap<>();
+    headers.put("header1", "value1");
+    headers.put("header2", "value2");
+
+
+    HttpPaginatorConfig offsetPaginatorForJson = HttpPaginatorConfig.builder()
+      .limitParam("limit")
+      .offsetParam("offset")
+      .method("offset")
+      .pageSize(2)
+      .build();
+
+    HttpApiConfig mockJsonConfigWithPaginator = HttpApiConfig.builder()
+      .url("http://localhost:8092/json")
+      .method("get")
+      .headers(headers)
+      .requireTail(false)
+      .paginator(offsetPaginatorForJson)
+      .inputType("json")
+      .build();
+
+    HttpPaginatorConfig pagePaginatorForXML = HttpPaginatorConfig.builder()
+      .method("page")
+      .pageParam("page")
+      .pageSizeParam("pageSize")
+      .pageSize(3)
+      .build();
+
+    List<String> params = new ArrayList<>();
+    params.add("foo");
+
+    HttpApiConfig mockXmlConfigWithPaginator = HttpApiConfig.builder()
+      .url("http://localhost:8092/xml")
+      .method("GET")
+      .requireTail(false)
+      .params(params)
+      .paginator(pagePaginatorForXML)
+      .inputType("xml")
+      .xmlDataLevel(2)
+      .build();
+
+    HttpApiConfig mockXmlConfigWithPaginatorAndUrlParams = HttpApiConfig.builder()
+      .url("http://localhost:8092/xml/{org}")
+      .method("GET")
+      .requireTail(false)
+      .params(params)
+      .paginator(pagePaginatorForXML)
+      .inputType("xml")
+      .xmlDataLevel(2)
+      .build();
+
+
+    HttpApiConfig mockCsvConfigWithPaginator = HttpApiConfig.builder()
+      .url("http://localhost:8092/csv")
+      .method("get")
+      .paginator(offsetPaginatorForJson)
+      .inputType("csv")
+      .requireTail(false)
+      .dataPath("results")
+      .build();
+
+
+    Map<String, HttpApiConfig> configs = new HashMap<>();
+    configs.put("csv_paginator", mockCsvConfigWithPaginator);
+    configs.put("json_paginator", mockJsonConfigWithPaginator);
+    configs.put("xml_paginator", mockXmlConfigWithPaginator);
+    configs.put("xml_paginator_url_params", mockXmlConfigWithPaginatorAndUrlParams);
+
+    HttpStoragePluginConfig mockStorageConfigWithWorkspace =
+      new HttpStoragePluginConfig(false, configs, 2, "", 80, "", "", "", PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+    mockStorageConfigWithWorkspace.setEnabled(true);
+    cluster.defineStoragePlugin("local", mockStorageConfigWithWorkspace);
+  }
+
+
+  @Test
+  @Ignore("Requires Live Connection to Github")
+  public void testPagePaginationWithURLParameters() throws Exception {
+    String sql = "SELECT * FROM live.github WHERE org='apache' LIMIT 15";
+    List<QueryDataBatch> results = client.queryBuilder().sql(sql).results();
+    assertEquals(3, results.size());
+
+    int count = 0;
+    for(QueryDataBatch b : results){
+      count += b.getHeader().getRowCount();
+      b.release();
+    }
+    assertEquals(3, results.size());
+    assertEquals(15, count);
+  }
+
+  @Test
+  public void simpleJSONPaginatorQuery() throws Exception {
+    String sql = "SELECT * FROM `local`.`json_paginator` LIMIT 4";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE2));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE3));
+
+      List<QueryDataBatch> results = client.queryBuilder()
+        .sql(sql)
+        .results();
+
+      int count = 0;
+      for(QueryDataBatch b : results){
+        count += b.getHeader().getRowCount();
+        b.release();
+      }
+      assertEquals(2, results.size());
+      assertEquals(4, count);
+    }
+  }
+
+  @Test
+  public void simpleJSONPaginatorQueryWithoutLimit() throws Exception {
+    String sql = "SELECT * FROM `local`.`json_paginator`";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE2));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE3));
+
+      List<QueryDataBatch> results = client.queryBuilder()
+        .sql(sql)
+        .results();
+
+      int count = 0;
+      for(QueryDataBatch b : results){
+        count += b.getHeader().getRowCount();
+        b.release();
+      }
+      assertEquals(3, results.size());
+      assertEquals(5, count);
+    }
+  }
+
+  @Test
+  public void simpleJSONPaginatorQueryWithoutLimitAndEvenResults() throws Exception {
+    String sql = "SELECT * FROM `local`.`json_paginator`";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE2));
+      server.enqueue(new MockResponse().setResponseCode(404).setBody(""));
+
+      List<QueryDataBatch> results = client.queryBuilder()
+        .sql(sql)
+        .results();
+
+      int count = 0;
+      for(QueryDataBatch b : results){
+        count += b.getHeader().getRowCount();
+        b.release();
+      }
+      assertEquals(2, results.size());
+      assertEquals(4, count);
+    }
+  }
+
+  @Test
+  public void simpleCSVPaginatorQuery() throws Exception {
+    String sql = "SELECT * FROM `local`.`csv_paginator` LIMIT 6";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_CSV_RESPONSE));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_CSV_RESPONSE_2));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_CSV_RESPONSE_3));
+
+      List<QueryDataBatch> results = client.queryBuilder()
+        .sql(sql)
+        .results();
+
+      int count = 0;
+      for(QueryDataBatch b : results){
+        count += b.getHeader().getRowCount();
+        b.release();
+      }
+      assertEquals(6, count);
+    }
+  }
+
+  @Test
+  public void simpleCSVPaginatorQueryWithoutLimit() throws Exception {
+    String sql = "SELECT * FROM `local`.`csv_paginator`";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_CSV_RESPONSE));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_CSV_RESPONSE_2));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_CSV_RESPONSE_3));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_CSV_RESPONSE_4));
+
+      List<QueryDataBatch> results = client.queryBuilder()
+        .sql(sql)
+        .results();
+
+      int count = 0;
+      for(QueryDataBatch b : results){
+        count += b.getHeader().getRowCount();
+        b.release();
+      }
+      assertEquals(7, count);
+    }
+  }
+
+  @Test
+  public void simpleCSVPaginatorQueryWithoutLimitAndEvenResults() throws Exception {
+    String sql = "SELECT * FROM `local`.`csv_paginator`";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_CSV_RESPONSE));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_CSV_RESPONSE_2));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_CSV_RESPONSE_3));
+      server.enqueue(new MockResponse().setResponseCode(404).setBody(""));
+
+      List<QueryDataBatch> results = client.queryBuilder()
+        .sql(sql)
+        .results();
+
+      int count = 0;
+      for(QueryDataBatch b : results){
+        count += b.getHeader().getRowCount();
+        b.release();
+      }
+      assertEquals(6, count);
+    }
+  }
+
+  @Test
+  public void simpleXMLPaginatorQuery() throws Exception {
+    String sql = "SELECT * FROM `local`.`xml_paginator` LIMIT 6";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_XML_PAGE1));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_XML_PAGE2));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_XML_PAGE3));
+
+      List<QueryDataBatch> results = client.queryBuilder()
+        .sql(sql)
+        .results();
+
+      int count = 0;
+      for(QueryDataBatch b : results){
+        count += b.getHeader().getRowCount();
+        b.release();
+      }
+      assertEquals(2, results.size());
+      assertEquals(6, count);
+    }
+  }
+
+  @Test
+  public void simpleXMLPaginatorQueryWithoutLimit() throws Exception {
+    String sql = "SELECT * FROM `local`.`xml_paginator`";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_XML_PAGE1));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_XML_PAGE2));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_XML_PAGE3));
+
+      List<QueryDataBatch> results = client.queryBuilder()
+        .sql(sql)
+        .results();
+
+      int count = 0;
+      for(QueryDataBatch b : results){
+        count += b.getHeader().getRowCount();
+        b.release();
+      }
+      assertEquals(3, results.size());
+      assertEquals(8, count);
+    }
+  }
+
+  @Test
+  public void testAggregateQuery() throws Exception {
+    // Note that since the data arrives in multiple batches,
+    // in order to access the contents, we have to receive the batches and parse them.
+    // This is the case even with aggregate queries.
+
+    String sql = "SELECT ZONE, COUNT(*) AS row_count FROM `local`.`xml_paginator` GROUP BY ZONE";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_XML_PAGE1));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_XML_PAGE2));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_XML_PAGE3));
+
+      QueryRowSetIterator iterator = client.queryBuilder().sql(sql).rowSetIterator();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("ZONE", MinorType.VARCHAR)
+        .add("row_count", MinorType.BIGINT)
+        .build();
+
+      RowSet expectedFirstRow = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("4", 5)
+        .build();
+
+      RowSet expectedSecondRow = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("3", 3)
+        .build();
+
+      int count = 0;
+
+      while (iterator.hasNext()) {
+        DirectRowSet results = iterator.next();
+        if (results.rowCount() > 0) {
+          if (count == 0) {
+            RowSetUtilities.verify(expectedFirstRow, results);
+          } else if (count == 1) {
+            RowSetUtilities.verify(expectedSecondRow, results);
+          }
+          count++;
+        }
+      }
+    }
+  }
+
+  @Test
+  public void simpleXMLPaginatorQueryWithoutLimitAndEvenResults() throws Exception {
+    String sql = "SELECT * FROM `local`.`xml_paginator`";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_XML_PAGE1));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_XML_PAGE2));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(""));
+      List<QueryDataBatch> results = client.queryBuilder()
+        .sql(sql)
+        .results();
+
+      int count = 0;
+      for(QueryDataBatch b : results){
+        count += b.getHeader().getRowCount();
+        b.release();
+      }
+
+      // Expects 2 batches with a total of six records.
+      assertEquals(2, results.size());
+      assertEquals(6, count);
+    }
+  }
+
+  /**
+   * Helper function to start the MockHTTPServer
+   * @return Started Mock server
+   * @throws IOException If the server cannot start, throws IOException
+   */
+  public MockWebServer startServer() throws IOException {
+    MockWebServer server = new MockWebServer();
+    server.start(MOCK_SERVER_PORT);
+    return server;
+  }
+}
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPaginator.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPaginator.java
new file mode 100644
index 0000000..c1feeb9
--- /dev/null
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPaginator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http;
+
+import okhttp3.HttpUrl;
+import org.apache.drill.exec.store.http.paginator.OffsetPaginator;
+import org.apache.drill.exec.store.http.paginator.PagePaginator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * This class tests the functionality of the various paginator classes as it pertains
+ * to generating the correct URLS.
+ */
+public class TestPaginator {
+
+  @Test
+  public void TestOffsetPaginator() {
+    String baseUrl = "https://myapi.com";
+    HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl).newBuilder();
+
+    OffsetPaginator op = new OffsetPaginator(urlBuilder, 25, 5, "limit", "offset");
+    List<HttpUrl> urls = op.buildPaginatedURLs();
+    assertEquals(5, urls.size());
+
+    List<HttpUrl> expected = new ArrayList<>();
+    expected.add( HttpUrl.parse("https://myapi.com/?offset=0&limit=5"));
+    expected.add( HttpUrl.parse("https://myapi.com/?offset=5&limit=5"));
+    expected.add( HttpUrl.parse("https://myapi.com/?offset=10&limit=5"));
+    expected.add( HttpUrl.parse("https://myapi.com/?offset=15&limit=5"));
+    expected.add( HttpUrl.parse("https://myapi.com/?offset=20&limit=5"));
+
+    assertEquals(expected, urls);
+  }
+
+  @Test
+  public void TestOffsetPaginatorIterator() {
+    String baseUrl = "https://myapi.com";
+    HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl).newBuilder();
+
+    OffsetPaginator op = new OffsetPaginator(urlBuilder, 25, 5, "limit", "offset");
+    assertEquals(5, op.count());
+
+    String next = op.next();
+    assertEquals(next, "https://myapi.com/?offset=0&limit=5");
+    next = op.next();
+    assertEquals(next, "https://myapi.com/?offset=5&limit=5");
+    next = op.next();
+    assertEquals(next, "https://myapi.com/?offset=10&limit=5");
+    next = op.next();
+    assertEquals(next, "https://myapi.com/?offset=15&limit=5");
+    next = op.next();
+    assertEquals(next, "https://myapi.com/?offset=20&limit=5");
+    next = op.next();
+    assertNull(next);
+  }
+
+  @Test
+  public void TestPagePaginatorIterator() {
+    String baseUrl = "https://api.github.com/orgs/apache/repos";
+    HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl).newBuilder();
+
+    PagePaginator pp = new PagePaginator(urlBuilder, 10, 2, "page", "per_page");
+    List<HttpUrl> urls = pp.getPaginatedUrls();
+    assertEquals(5, urls.size());
+
+    PagePaginator pp2 = new PagePaginator(urlBuilder, 10, 100, "page", "per_page");
+    List<HttpUrl> urls2 = pp2.getPaginatedUrls();
+    assertEquals(1, urls2.size());
+  }
+}
diff --git a/contrib/storage-http/src/test/resources/data/p1.json b/contrib/storage-http/src/test/resources/data/p1.json
new file mode 100644
index 0000000..467a458
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/p1.json
@@ -0,0 +1,10 @@
+[
+  {
+    "col_1": 1.0,
+    "col_2":  2,
+    "col_3": "3.0"
+  },{
+  "col_1": 4.0,
+  "col_2":  5,
+  "col_3": "6.0"
+}]
diff --git a/contrib/storage-http/src/test/resources/data/p2.json b/contrib/storage-http/src/test/resources/data/p2.json
new file mode 100644
index 0000000..d96c16f
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/p2.json
@@ -0,0 +1,10 @@
+[
+  {
+    "col_1": 7.0,
+    "col_2":  8,
+    "col_3": "9.0"
+  },{
+  "col_1": 10.0,
+  "col_2":  11,
+  "col_3": "12.0"
+}]
diff --git a/contrib/storage-http/src/test/resources/data/p3.json b/contrib/storage-http/src/test/resources/data/p3.json
new file mode 100644
index 0000000..d23a36c
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/p3.json
@@ -0,0 +1,6 @@
+[
+  {
+    "col_1": 13.0,
+    "col_2":  14,
+    "col_3": "15.0"
+  }]
diff --git a/contrib/storage-http/src/test/resources/data/response_1.xml b/contrib/storage-http/src/test/resources/data/response_1.xml
new file mode 100644
index 0000000..d541cde
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/response_1.xml
@@ -0,0 +1,45 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<CATALOG>
+  <PLANT>
+    <COMMON>Bloodroot</COMMON>
+    <BOTANICAL>Sanguinaria canadensis</BOTANICAL>
+    <ZONE>4</ZONE>
+    <LIGHT>Mostly Shady</LIGHT>
+    <PRICE>$2.44</PRICE>
+    <AVAILABILITY>031599</AVAILABILITY>
+  </PLANT>
+  <PLANT>
+    <COMMON>Columbine</COMMON>
+    <BOTANICAL>Aquilegia canadensis</BOTANICAL>
+    <ZONE>3</ZONE>
+    <LIGHT>Mostly Shady</LIGHT>
+    <PRICE>$9.37</PRICE>
+    <AVAILABILITY>030699</AVAILABILITY>
+  </PLANT>
+  <PLANT>
+    <COMMON>Marsh Marigold</COMMON>
+    <BOTANICAL>Caltha palustris</BOTANICAL>
+    <ZONE>4</ZONE>
+    <LIGHT>Mostly Sunny</LIGHT>
+    <PRICE>$6.81</PRICE>
+    <AVAILABILITY>051799</AVAILABILITY>
+  </PLANT>
+</CATALOG>
diff --git a/contrib/storage-http/src/test/resources/data/response_2.csv b/contrib/storage-http/src/test/resources/data/response_2.csv
new file mode 100644
index 0000000..e512faa
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/response_2.csv
@@ -0,0 +1,3 @@
+"col1","col2","col3"
+"7","8","9"
+"10","11","12"
\ No newline at end of file
diff --git a/contrib/storage-http/src/test/resources/data/response_2.xml b/contrib/storage-http/src/test/resources/data/response_2.xml
new file mode 100644
index 0000000..5c00f57
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/response_2.xml
@@ -0,0 +1,45 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<CATALOG>
+  <PLANT>
+    <COMMON>Cowslip</COMMON>
+    <BOTANICAL>Caltha palustris</BOTANICAL>
+    <ZONE>4</ZONE>
+    <LIGHT>Mostly Shady</LIGHT>
+    <PRICE>$9.90</PRICE>
+    <AVAILABILITY>030699</AVAILABILITY>
+  </PLANT>
+  <PLANT>
+    <COMMON>Dutchman's-Breeches</COMMON>
+    <BOTANICAL>Dicentra cucullaria</BOTANICAL>
+    <ZONE>3</ZONE>
+    <LIGHT>Mostly Shady</LIGHT>
+    <PRICE>$6.44</PRICE>
+    <AVAILABILITY>012099</AVAILABILITY>
+  </PLANT>
+  <PLANT>
+    <COMMON>Ginger, Wild</COMMON>
+    <BOTANICAL>Asarum canadense</BOTANICAL>
+    <ZONE>3</ZONE>
+    <LIGHT>Mostly Shady</LIGHT>
+    <PRICE>$9.03</PRICE>
+    <AVAILABILITY>041899</AVAILABILITY>
+  </PLANT>
+</CATALOG>
diff --git a/contrib/storage-http/src/test/resources/data/response_3.csv b/contrib/storage-http/src/test/resources/data/response_3.csv
new file mode 100644
index 0000000..70c8117
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/response_3.csv
@@ -0,0 +1,3 @@
+"col1","col2","col3"
+"13","14","15"
+"16","17","18"
\ No newline at end of file
diff --git a/contrib/storage-http/src/test/resources/data/response_3.xml b/contrib/storage-http/src/test/resources/data/response_3.xml
new file mode 100644
index 0000000..f8f4a43
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/response_3.xml
@@ -0,0 +1,37 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<CATALOG>
+  <PLANT>
+    <COMMON>Hepatica</COMMON>
+    <BOTANICAL>Hepatica americana</BOTANICAL>
+    <ZONE>4</ZONE>
+    <LIGHT>Mostly Shady</LIGHT>
+    <PRICE>$4.45</PRICE>
+    <AVAILABILITY>012699</AVAILABILITY>
+  </PLANT>
+  <PLANT>
+    <COMMON>Liverleaf</COMMON>
+    <BOTANICAL>Hepatica americana</BOTANICAL>
+    <ZONE>4</ZONE>
+    <LIGHT>Mostly Shady</LIGHT>
+    <PRICE>$3.99</PRICE>
+    <AVAILABILITY>010299</AVAILABILITY>
+  </PLANT>
+</CATALOG>
diff --git a/contrib/storage-http/src/test/resources/data/response_4.csv b/contrib/storage-http/src/test/resources/data/response_4.csv
new file mode 100644
index 0000000..92ca804
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/response_4.csv
@@ -0,0 +1,2 @@
+"col1","col2","col3"
+"19","20","21"
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnUtils.java
index 626debd..055c459 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnUtils.java
@@ -35,7 +35,7 @@ public class ImplicitColumnUtils {
 
   /**
    * This class represents an implicit column in a dataset.  These columns are typically used for metadata that is consistent
-   * across an entire dataset.  A filename for example, or HTTP response codes.  It is good practice to name
+   * across an entire dataset.  A filename for example, or HTTP response codes.  It is a good practice to name
    * implicit fields with an underscore so that these field names do not conflict with fields from the user's
    * data.  For example _http_response_code.
    *
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index b6df9ae..edc687f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -298,6 +298,9 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
           implicitFields.writeImplicitColumns();
         }
         rowWriter.save();
+      } else if (rowWriter.limitReached(maxRows)) {
+        eof = true;
+        break;
       } else {
         // Special case for empty data sets to still record implicit columns
         if (implicitFields != null) {