You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2020/05/01 00:05:13 UTC

[drill] branch master updated: DRILL-7711: Add data path, parameter filter pushdown to HTTP plugin

This is an automated email from the ASF dual-hosted git repository.

progers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 0582e99  DRILL-7711: Add data path, parameter filter pushdown to HTTP plugin
0582e99 is described below

commit 0582e99e7308691f1e254069901da14cd4d4ca46
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Sat Apr 18 12:46:45 2020 -0700

    DRILL-7711: Add data path, parameter filter pushdown to HTTP plugin
    
    Adds an option to specify the path to data so the plugin will
    ignore REST message "overhead" except the actual data.
    
    Allows specifying HTTP URL parameters as filter push-downs from
    SQL.
    
    * Revised scan cost model to allow a scan more freedom
      to define cost. Needed to ensure that filter push-down
      is actually accepted by Calcite.
    * Reduced size of HTTP scan plan by including on the one
      needed connection config rather than all of them.
    * Revised URL building so that the proxy sees the full
      URL after adding parameters, etc.
    * Many code refinements.
    * Added more details to README
---
 .../org/apache/drill/common/PlanStringBuilder.java |   5 +
 contrib/storage-http/README.md                     | 420 ++++++++++++++++-----
 .../exec/store/http/HttpAPIConnectionSchema.java   |  19 +-
 .../{HttpAPIConfig.java => HttpApiConfig.java}     |  90 +++--
 .../drill/exec/store/http/HttpBatchReader.java     |  99 ++++-
 .../drill/exec/store/http/HttpGroupScan.java       | 172 +++++++--
 .../exec/store/http/HttpPushDownListener.java      | 158 ++++++++
 .../exec/store/http/HttpScanBatchCreator.java      |  12 +-
 .../apache/drill/exec/store/http/HttpScanSpec.java |  46 ++-
 .../drill/exec/store/http/HttpSchemaFactory.java   |  82 ++--
 .../drill/exec/store/http/HttpStoragePlugin.java   |  27 +-
 .../exec/store/http/HttpStoragePluginConfig.java   |  31 +-
 .../apache/drill/exec/store/http/HttpSubScan.java  |  36 +-
 .../exec/store/http/filter/ConstantHolder.java     | 384 +++++++++++++++++++
 .../drill/exec/store/http/filter/ExprNode.java     | 234 ++++++++++++
 .../store/http/filter/FilterPushDownListener.java  | 149 ++++++++
 .../store/http/filter/FilterPushDownStrategy.java  | 323 ++++++++++++++++
 .../store/http/filter/FilterPushDownUtils.java     | 322 ++++++++++++++++
 .../apache/drill/exec/store/http/filter/RelOp.java |  98 +++++
 .../drill/exec/store/http/util/SimpleHttp.java     | 146 +++----
 .../drill/exec/store/http/TestHttpPlugin.java      | 274 ++++++++++----
 .../apache/drill/exec/physical/base/ScanStats.java |  36 +-
 .../exec/physical/resultSet/impl/ColumnState.java  |  12 +-
 .../drill/exec/planner/cost/DrillCostBase.java     |   8 +-
 .../drill/exec/planner/logical/DrillScanRel.java   |  74 ++--
 .../drill/exec/store/AbstractSchemaFactory.java    |   1 -
 .../store/easy/json/loader/JsonLoaderImpl.java     |   7 +
 .../easy/json/parser/JsonStructureParser.java      |  46 ++-
 .../exec/store/easy/json/parser/MessageParser.java |   1 +
 .../exec/store/easy/json/parser/RootParser.java    | 139 +++++--
 .../easy/json/parser/SimpleMessageParser.java      |  30 +-
 .../store/easy/json/parser/BaseTestJsonParser.java |  10 +
 .../easy/json/parser/TestJsonParserErrors.java     |  22 +-
 .../easy/json/parser/TestJsonParserMessage.java    |  60 +--
 34 files changed, 3054 insertions(+), 519 deletions(-)

diff --git a/common/src/main/java/org/apache/drill/common/PlanStringBuilder.java b/common/src/main/java/org/apache/drill/common/PlanStringBuilder.java
index 7d06eaf..ef04a9a 100644
--- a/common/src/main/java/org/apache/drill/common/PlanStringBuilder.java
+++ b/common/src/main/java/org/apache/drill/common/PlanStringBuilder.java
@@ -107,6 +107,11 @@ public class PlanStringBuilder {
     return field(key, StringEscapeUtils.escapeJava(value));
   }
 
+  public PlanStringBuilder maskedField(String key, String value) {
+    // Intentionally ignore length
+    return field(key, value == null ? null : "*******");
+  }
+
   private void startField(String key) {
     if (fieldCount++ != 0) {
       buf.append(", ");
diff --git a/contrib/storage-http/README.md b/contrib/storage-http/README.md
index 3b9965c..0daa74a 100644
--- a/contrib/storage-http/README.md
+++ b/contrib/storage-http/README.md
@@ -1,9 +1,11 @@
 # Generic REST API Storage Plugin
 
-This plugin is intended to enable you to query APIs over HTTP/REST. At this point, the API reader will only accept JSON as input however in the future, it may be possible to
- add additional format readers to allow for APIs which return XML, CSV or other formats.
+The HTTP storage plugin lets you query APIs over HTTP/REST. The plugin
+expects JSON responses.
 
-Note:  This plugin should **NOT** be used for interacting with tools which have REST APIs such as Splunk or Solr. It will not be performant for those use cases.
+The HTTP plugin is new in Drill 1.18 and is an Alpha feature. It works well, and we
+enourage you to use it and provide feedback. However, we reserve the right to change
+the plugin based on that feedback.
 
 ## Configuration
 
@@ -15,7 +17,7 @@ To configure the plugin, create a new storage plugin, and add the following conf
   "cacheResults": true,
   "connections": {},
   "timeout": 0,
-  "proxyHost": null, 
+  "proxyHost": null,
   "proxyPort": 0,
   "proxyType": null,
   "proxyUsername": null,
@@ -24,46 +26,216 @@ To configure the plugin, create a new storage plugin, and add the following conf
 }
 ```
 The required options are:
+
 * `type`:  This should be `http`
 * `cacheResults`:  Enable caching of the HTTP responses.  Defaults to `false`
 * `timeout`:  Sets the response timeout in seconds. Defaults to `0` which is no timeout.
 * `connections`:  This field contains the details for individual connections. See the section *Configuring API Connections for Details*.
 
-You can configure Drill to work behind a corporate proxy. Details are listed below. 
+You can configure Drill to work behind a corporate proxy. Details are listed below.
 
 ### Configuring the API Connections
 
 The HTTP Storage plugin allows you to configure multiple APIS which you can query directly from this plugin. To do so, first add a `connections` parameter to the configuration
 . Next give the connection a name, which will be used in queries.  For instance `stockAPI` or `jira`.
 
-The `connection` can accept the following options:
-* `url`: The base URL which Drill will query. You should include the ending slash if there are additional arguments which you are passing.
-* `method`: The request method. Must be `get` or `post`. Other methods are not allowed and will default to `GET`.
-* `headers`: Often APIs will require custom headers as part of the authentication. This field allows you to define key/value pairs which are submitted with the http request
-.  The format is:
+The `connection` property can accept the following options.
+
+#### URL
+
+`url`: The base URL which Drill will query.
+
+`requireTail`: Set to `true` if the query must contain an additional part of the service
+URL as a table name, `false` if the URL needs no additional suffix other than that
+provided by `WHERE` clause filters. (See below.)
+
+
+If your service requires parameters, you have three choices. Suppose your connection is called
+`sunrise`. First, can include them directly in your URL if the parameters a fixed for a given
+service:
+
+```json
+url: "https://api.sunrise-sunset.org/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02",
+requireTail: false
+```
+
+Query your table like this:
+
+```sql
+SELECT * FROM api.sunrise;
+```
+
+Second, you can specify the base URL here and the full URL in your query. Use this form if the
+parameters define a table-like concept (the set of data to return):
+
+```json
+url: "https://api.sunrise-sunset.org/json",
+requireTail: true
+```
+
+SQL query:
+
+```sql
+SELECT * FROM api.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`
+```
+
+If the URL requires a tail, specify it as if it were a table name. (See example
+below.) Drill directly appends the "tail" to the base URL to create the final URL.
+
+Third, you can use the `params` field below to specify the parameters as filters
+if the parameters specify which data sets to return:
+
+```json
+url: "https://api.sunrise-sunset.org/json",
+requireTail: false,
+params: ["lat", "lng", "date"]
+```
+
+SQL query:
+
+```sql
+SELECT * FROM api.sunrise
+WHERE `lat` = 36.7201600 AND `lng` = -4.4203400 AND `date` = '2019-10-02'
+```
+
+In this case, Drill appends the parameters to the URL, adding a question mark
+to separate the two.
+
+#### Method
+
+`method`: The request method. Must be `GET` or `POST`. Other methods are not allowed and will default to `GET`.
+
+`postBody`: Contains data, in the form of key value pairs, which are sent during a `POST` request.
+The post body should be in the of a block of text with key/value pairs:
+
+```json
+postBody: "key1=value1
+key2=value2"
+```
+
+#### Headers
+
+`headers`: Often APIs will require custom headers as part of the authentication. This field allows
+you to define key/value pairs which are submitted with the http request. The format is:
+
 ```json
 headers: {
    "key1": "Value1",
    "key2": "Value2"
+   }
+```
+
+#### Query Parmeters as Filters
+
+* `params`: Allows you to map SQL `WHERE` clause conditions to query parameters.
+
+```json
+url: "https://api.sunrise-sunset.org/json",
+params: ["lat", "lng", "date"]
+```
+
+SQL query:
+
+```sql
+SELECT * FROM api.sunrise
+WHERE `lat` = 36.7201600 AND `lng` = -4.4203400 AND `date` = '2019-10-02'
+```
+
+HTTP parameters are untyped; Drill converts any value you provide into a string.
+Drill allows you to use any data type which can convert unambiguously to a string:
+`BIT`, `INT`, `BIGINT`, `FLOAT4`, `FLOAT8`, `VARDECIMAL`, `VARCHAR`. The `BIT` type
+is translated to `true` and `false`. Note that none of the date or interval types
+are allowed: each of those requires formatting.
+
+Note the need for back-tick quotes around the names;
+`date` is a reserved word. Notice also that the date is a string because of
+the formatting limitation mentioned above.
+
+Only equality conditions can be translated to parameters. The above filters are
+translated to:
+
+```
+lat=36.7201600&lng=-4.4203400&date=2019-10-02
+```
+
+If your query contains other conditions (`!=`, `<`, etc.) then those conditions are applied
+in Drill after the REST service returns the data.
+
+Only fields listed in the `params` config filed will become parameters, all other
+experssions are handled within Drill as explained above.
+
+At present, Drill requires the values to be literals (constants). Drill does not
+currently allow expressions. That is, the following will not become an HTTP parameter:
+
+```sql
+WHERE `lat` = 36 + 0.7201600
+```
+
+Drill will add parameters to the URL in the order listed in the config. Use this
+feature if the API is strict about parameter ordering.
+
+At present Drill does not enforce that parameters are provided in the query: Drill
+assumes parameters are optional.
+
+### Data Path
+
+REST responses often have structure beyond the data you want to query. For example:
+
+```json
+ "results":
+ {
+   "sunrise":"7:27:02 AM",
+   "sunset":"5:05:55 PM",
+   "solar_noon":"12:16:28 PM",
+   "day_length":"9:38:53",
+   "civil_twilight_begin":"6:58:14 AM",
+   "civil_twilight_end":"5:34:43 PM",
+   "nautical_twilight_begin":"6:25:47 AM",
+   "nautical_twilight_end":"6:07:10 PM",
+   "astronomical_twilight_begin":"5:54:14 AM",
+   "astronomical_twilight_end":"6:38:43 PM"
+ },
+  "status":"OK"
 }
 ```
-* `authType`: If your API requires authentication, specify the authentication type. At the time of implementation, the plugin only supports basic authentication, however, the
- plugin will likely support OAUTH2 in the future. Defaults to `none`. If the `authType` is set to `basic`, `username` and `password` must be set in the configuration as well.
- * `username`: The username for basic authentication.
- * `password`: The password for basic authentication.
- * `postBody`: Contains data, in the form of key value pairs, which are sent during a `POST` request. Post body should be in the form:
- ```
-key1=value1
-key2=value2
+
+Drill can handle JSON structures such as the above; you can use SQL to obtain the
+results you want. However, the SQL will be simpler if we skip over the portions we
+don't want and simply read the `results` fields as our SQL fields. We do that with
+the `dataPath` configuration:
+
+```json
+dataPath: "results"
 ```
 
+The `dataPath` can contain any number of fields, for example: `"response/content/data"`.
+Drill will ignore all JSON content outside of the data path.
+
+At present, there is no provision to check the `status` code in a response such
+as that shown above. Drill assumes that the server will uses HTTP status codes to
+indicate a bad request or other error.
+
+#### Authorization
+
+`authType`: If your API requires authentication, specify the authentication
+type. At the time of implementation, the plugin only supports basic authentication, however, the
+plugin will likely support OAUTH2 in the future. Defaults to `none`.
+If the `authType` is set to `basic`, `username` and `password` must be set in the configuration as well.
+
+`username`: The username for basic authentication.
+
+`password`: The password for basic authentication.
+
 ## Usage
 
-This plugin is different from other plugins in that it the table component of the `FROM` clause is different. In normal Drill queries, the `FROM` clause is constructed as follows:
+This plugin is different from other plugins in that it the table component of the `FROM` clause
+is different. In normal Drill queries, the `FROM` clause is constructed as follows:
+
 ```sql
 FROM <storage plugin>.<schema>.<table>
 ```
 For example, you might have:
+
 ```sql
 FROM dfs.test.`somefile.csv`
 
@@ -72,13 +244,18 @@ FROM dfs.test.`somefile.csv`
 FROM mongo.stats.sales_data
 ```
 
-The HTTP/REST plugin the `FROM` clause enables you to pass arguments to your REST call. The structure is:
+The HTTP/REST plugin the `FROM` clause enables you to pass arguments to your REST call if you
+set the `requireTail` property to `true`. The structure is:
+
 ```sql
 FROM <plugin>.<connection>.<arguments>
 --Actual example:
- FROM http.sunrise.`/json?lat=36.7201600&lng=-4.4203400&date=today`
+FROM http.sunrise.`/json?lat=36.7201600&lng=-4.4203400&date=today`
 ```
 
+Or, as explained above, you can have the URL act like a table and pass parameters
+using a `WHERE` clause "filter" conditions.
+
 ## Proxy Setup
 
 Some users access HTTP services from behind a proxy firewall. Drill provides three ways specify proxy
@@ -98,7 +275,9 @@ handle proxies.
 ### Boot Configuration
 
 You can also specify proxy configuration in the `drill-override.conf` file.
-See `drill-override-example.conf` for a template.
+See `drill-override-example.conf` for a template. Use the boot configuration
+is an attribute of your network environment. Doing so will ensure every
+Drillbit and every HTTP/HTTPS request uses the same proxy configuration.
 
 First, you can use the same form of URL you would use with the environment
 variables:
@@ -126,9 +305,6 @@ as `none`.
 
 Again, there is a parallel section for HTTPS.
 
-Either of these approaches is preferred if the proxy is an attribute of your
-network environment and is the same for all external HTTP/HTTPS requests.
-
 ### In the HTTP Storage Plugin Config
 
 The final way to configure proxy is in the HTTP storage plugin itself. The proxy
@@ -170,34 +346,36 @@ The API sunrise-sunset.org returns data in the following format:
   "status":"OK"
 }
 ```
+
 To query this API, set the configuration as follows:
 
 ```json
-
- {
-   "type": "http",
-   "cacheResults": false,
-   "enabled": true,
-   "timeout": 5,
-   "connections": {
-     "sunrise": {
-       "url": "https://api.sunrise-sunset.org/",
-       "method": "GET",
-       "headers": null,
-       "authType": "none",
-       "userName": null,
-       "password": null,
-       "postBody": null
-     }
-   }
-}
+{
+  "type": "http",
+  "cacheResults": false,
+  "enabled": true,
+  "timeout": 5,
+  "connections": {
+    "sunrise": {
+      "url": "https://api.sunrise-sunset.org/json",
+      "requireTail": true,
+      "method": "GET",
+      "headers": null,
+      "authType": "none",
+      "userName": null,
+      "password": null,
+      "postBody": null
+    }
+  }
 
 ```
+
 Then, to execute a query:
+
 ```sql
-    SELECT api_results.results.sunrise AS sunrise,
-    api_results.results.sunset AS sunset
-    FROM http.sunrise.`/json?lat=36.7201600&lng=-4.4203400&date=today` AS api_results;
+SELECT api_results.results.sunrise AS sunrise,
+       api_results.results.sunset AS sunset
+FROM   http.sunrise.`?lat=36.7201600&lng=-4.4203400&date=today` AS api_results;
 ```
 Which yields the following results:
 ```
@@ -209,29 +387,61 @@ Which yields the following results:
 1 row selected (0.632 seconds)
 ```
 
-### Example 2: JIRA
+#### Using Parameters
 
-JIRA Cloud has a REST API which is [documented here](https://developer.atlassian.com/cloud/jira/platform/rest/v3/?utm_source=%2Fcloud%2Fjira%2Fplatform%2Frest%2F&utm_medium=302).
+We can improvide the above configuration to use `WHERE` clause filters and
+a `dataPath` to skip over the unwanted parts of the message
+body. Set the configuration as follows:
 
-To connect Drill to JIRA Cloud, use the following configuration:
 ```json
 {
   "type": "http",
   "cacheResults": false,
+  "enabled": true,
   "timeout": 5,
   "connections": {
     "sunrise": {
-      "url": "https://api.sunrise-sunset.org/",
+      "url": "https://api.sunrise-sunset.org/json",
+      "requireTail": false,
       "method": "GET",
+      "dataPath": "results",
       "headers": null,
+      "params": [ "lat", "lng", "date" ],
       "authType": "none",
       "userName": null,
       "password": null,
       "postBody": null
-    },
+    }
+  }
+
+```
+Then, to execute a query:
+
+```sql
+SELECT sunrise, sunset
+FROM   http.sunrise
+WHERE  `lat` = 36.7201600 AND `lng` = -4.4203400 AND `date` = 'today'
+```
+
+Which yields the same results as before.
+
+### Example 2: JIRA
+
+JIRA Cloud has a REST API which is
+[documented here](https://developer.atlassian.com/cloud/jira/platform/rest/v3/?utm_source=%2Fcloud%2Fjira%2Fplatform%2Frest%2F&utm_medium=302).
+
+To connect Drill to JIRA Cloud, use the following configuration:
+
+```json
+{
+  "type": "http",
+  "cacheResults": false,
+  "timeout": 5,
+  "connections": {
     "jira": {
       "url": "https://<project>.atlassian.net/rest/api/3/",
       "method": "GET",
+      "dataPath": "issues",
       "headers": {
         "Accept": "application/json"
       },
@@ -245,36 +455,32 @@ To connect Drill to JIRA Cloud, use the following configuration:
 }
 ```
 
-Once you've configured Drill to query the API, you can now easily access any of your data in JIRA. The JIRA API returns highly nested data, however with a little preparation, it
- is pretty straightforward to transform it into a more useful table. For instance, the
- query below:
+Once you've configured Drill to query the API, you can now easily access any of your data in JIRA.
+The JIRA API returns highly nested data, however with a little preparation, it
+is pretty straightforward to transform it into a more useful table. For instance, the
+query below:
+
 ```sql
-SELECT jira_data.issues.key AS key,
-jira_data.issues.fields.issueType.name AS issueType,
-SUBSTR(jira_data.issues.fields.created, 1, 10) AS created,
-SUBSTR(jira_data.issues.fields.updated, 1, 10) AS updated,
-jira_data.issues.fields.assignee.displayName as assignee,
-jira_data.issues.fields.creator.displayName as creator,
-jira_data.issues.fields.summary AS summary,
-jira_data.issues.fields.status.name AS currentStatus,
-jira_data.issues.fields.priority.name AS priority,
-jira_data.issues.fields.labels AS labels,
-jira_data.issues.fields.subtasks AS subtasks
-FROM (
-SELECT flatten(t1.issues) as issues
-FROM http.jira.`search?jql=project=<project>&&maxResults=100` AS t1
-) AS jira_data
+SELECT key,
+       t.fields.issueType.name AS issueType,
+       SUBSTR(t.fields.created, 1, 10) AS created,
+       SUBSTR(t.fields.updated, 1, 10) AS updated,
+       t.fields.assignee.displayName as assignee,
+       t.fields.creator.displayName as creator,
+       t.fields.summary AS summary,
+       t.fields.status.name AS currentStatus,
+       t.fields.priority.name AS priority,
+       t.fields.labels AS labels,
+       t.fields.subtasks AS subtasks
+FROM http.jira.`search?jql=project%20%3D%20<project>&&maxResults=100 AS t`
 ```
+
 The query below counts the number of issues by priority:
 
 ```sql
-SELECT
-jira_data.issues.fields.priority.name AS priority,
-COUNT(*) AS issue_count
-FROM (
-SELECT flatten(t1.issues) as issues
-FROM http.jira.`search?jql=project=<project>&maxResults=100` AS t1
-) AS jira_data
+SELECT t.fields.priority.name AS priority,
+       COUNT(*) AS issue_count
+FROM http.jira.`search?jql=project%20%3D%20<project>&&maxResults=100` AS t
 GROUP BY priority
 ORDER BY issue_count DESC
 ```
@@ -284,18 +490,62 @@ ORDER BY issue_count DESC
 
 ## Limitations
 
-1. The plugin is supposed to follow redirects, however if you are using Authentication, you may encounter errors or empty responses if you are counting on the endpoint for
+1. The plugin is supposed to follow redirects, however if you are using authentication,
+   you may encounter errors or empty responses if you are counting on the endpoint for
    redirection.
 
-2. At this time, the plugin does not support any authentication other than basic authentication. Future functionality may include OAUTH2 authentication and/or PKI
-  authentication for REST APIs.
+2. At this time, the plugin does not support any authentication other than basic authentication.
+
+3. This plugin does not implement join filter pushdowns (only constant plushdowns are
+   supported). Join pushdown has the potential to improve performance if you use the HTTP service
+   joined to another table.
+
+4. This plugin only reads JSON responses.
+
+5. `POST` bodies can only be in the format of key/value pairs. Some APIs accept
+    JSON based `POST` bodies but this is not currently supported.
+
+6. When using `dataPath`, the returned message should a single JSON object. The field
+   pointed to by the `dataPath` should contain a single JSON object or an array of objects.
+
+7. When not using `dataPath`, the response should be a single JSON object, an array of
+   JSON objects, or a series of line-delimited JSON objects (the so-called
+   [jsonlines](http://jsonlines.org/) format.)
+
+8. Parameters are considered optional; no error will be given if a query omits
+   parameters. An enhancement would be to mark parameters as required: all are required
+   or just some. If parameters are required, but omitted, the report service will
+   likely return an error.
+
+## Troubleshooting
 
-3. This plugin does not implement filter pushdowns. Filter pushdown has the potential to improve performance.
+If anything goes wrong, Drill will provide a detailed error message, including URL:
 
-4. This plugin only reads JSON responses. Future functionality may include the ability to parse XML, CSV or other common rest responses.
+```
+DATA_READ ERROR: Failed to read the HTTP response body
+
+Error message: Read timed out
+Connection: sunrise
+Plugin: api
+URL: https://api.sunrise-sunset.org/json?lat=36.7201600&lng=-4.4203400&date=today
+Fragment: 0:0
+```
+
+If using a "tail" in the query, verify that the tail is quoted using back-ticks
+as shown in the examples.
+
+Check that the URL is correct. If not, check the plugin configuration properties
+described above to find out why the pieces were assembed as you want.
+
+If the query works but delivers unexpected results, check the Drill log file.
+Drill logs a message like the following at the info level when opening the HTTP connection:
+
+```
+Connection: sunrise, Method: GET,
+  URL: https://api.sunrise-sunset.org/json?lat=36.7201600&lng=-4.4203400&date=today
+```
 
-5. At this time `POST` bodies can only be in the format of key/value pairs. Some APIs accept JSON based `POST` bodies and this is not currently supported.
+If the query runs, but produces odd results, try a simple `SELECT *` query. This may reveal
+if there is unexpected message context in addition to the data. Use the `dataPath` property
+to ignore the extra content.
 
-6. The returned message should contain only records, as a JSON array of objects (or as a series of JSON objects as in a JSON file). The
-   present version does not yet have the ability to ignore message "overhead" such as status codes, etc.  You can of course, select individual fields in your query to ignore
-    "overhead" fields. 
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java
index d3fa51f..81efda1 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConnectionSchema.java
@@ -18,10 +18,11 @@
 package org.apache.drill.exec.store.http;
 
 import org.apache.calcite.schema.Table;
+import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.http.HttpSchemaFactory.HttpSchema;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -31,18 +32,14 @@ import java.util.Set;
  */
 public class HttpAPIConnectionSchema extends AbstractSchema {
 
-  private final Map<String, DynamicDrillTable> activeTables = new HashMap<>();
-
   private final HttpStoragePlugin plugin;
+  private final Map<String, DynamicDrillTable> activeTables = CaseInsensitiveMap.newHashMap();
 
-  private final String pluginName;
-
-  public HttpAPIConnectionSchema(HttpSchemaFactory.HttpSchema httpSchema,
+  public HttpAPIConnectionSchema(HttpSchema parent,
                                  String name,
                                  HttpStoragePlugin plugin) {
-    super(httpSchema.getSchemaPath(), name);
+    super(parent.getSchemaPath(), name);
     this.plugin = plugin;
-    pluginName = plugin.getName();
   }
 
   @Override
@@ -61,13 +58,15 @@ public class HttpAPIConnectionSchema extends AbstractSchema {
    */
   @Override
   public Table getTable(String tableName) {
-    DynamicDrillTable table = activeTables.get(name);
+    DynamicDrillTable table = activeTables.get(tableName);
     if (table != null) {
       // Return the found table
       return table;
     } else {
       // Register a new table
-      return registerTable(name, new DynamicDrillTable(plugin, pluginName, new HttpScanSpec(pluginName, name, tableName, plugin.getConfig())));
+      return registerTable(tableName, new DynamicDrillTable(plugin, plugin.getName(),
+          new HttpScanSpec(plugin.getName(), name, tableName,
+              plugin.getConfig().copyForPlan(name))));
     }
   }
 
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
similarity index 63%
rename from contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConfig.java
rename to contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
index a850d1d..e93ff92 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpAPIConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
@@ -19,33 +19,56 @@ package org.apache.drill.exec.store.http;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.parquet.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class HttpAPIConfig {
-  private static final Logger logger = LoggerFactory.getLogger(HttpAPIConfig.class);
+public class HttpApiConfig {
+  private static final Logger logger = LoggerFactory.getLogger(HttpApiConfig.class);
 
   private final String url;
 
-  private final HttpMethods method;
+  /**
+   * Whether this API configuration represents a schema (with the
+   * table providing additional parts of the URL), or if this
+   * API represents a table (the URL is complete except for
+   * parameters specified in the WHERE clause.)
+   */
+  private final boolean requireTail;
+
+  private final HttpMethod method;
+
+  private final String postBody;
 
   private final Map<String, String> headers;
 
-  private final String authType;
+  /**
+   * List of query parameters which can be used in the SQL WHERE clause
+   * to push filters to the REST request as HTTP query parameters.
+   */
+  private final List<String> params;
 
-  private final String userName;
+  /**
+   * Path within the message to the JSON object, or array of JSON
+   * objects, which contain the actual data. Allows a request to
+   * skip over "overhead" such as status codes. Must be a slash-delimited
+   * set of JSON field names.
+   */
+  private final String dataPath;
 
+  private final String authType;
+  private final String userName;
   private final String password;
 
-  private final String postBody;
-
-  public enum HttpMethods {
+  public enum HttpMethod {
     /**
      * Value for HTTP GET method
      */
@@ -56,17 +79,21 @@ public class HttpAPIConfig {
     POST;
   }
 
-  public HttpAPIConfig(@JsonProperty("url") String url,
+  public HttpApiConfig(@JsonProperty("url") String url,
                        @JsonProperty("method") String method,
                        @JsonProperty("headers") Map<String, String> headers,
                        @JsonProperty("authType") String authType,
                        @JsonProperty("userName") String userName,
                        @JsonProperty("password") String password,
-                       @JsonProperty("postBody") String postBody) {
+                       @JsonProperty("postBody") String postBody,
+                       @JsonProperty("params") List<String> params,
+                       @JsonProperty("dataPath") String dataPath,
+                       @JsonProperty("requireTail") Boolean requireTail) {
 
     this.headers = headers;
     this.method = Strings.isNullOrEmpty(method)
-        ? HttpMethods.GET : HttpMethods.valueOf(method.trim().toUpperCase());
+        ? HttpMethod.GET : HttpMethod.valueOf(method.trim().toUpperCase());
+    this.url = url;
 
     // Get the request method.  Only accept GET and POST requests.  Anything else will default to GET.
     switch (this.method) {
@@ -86,19 +113,18 @@ public class HttpAPIConfig {
         .build(logger);
     }
 
-    // Put a trailing slash on the URL if it is missing
-    if (url.charAt(url.length() - 1) != '/') {
-      this.url = url + "/";
-    } else {
-      this.url = url;
-    }
-
     // Get the authentication method. Future functionality will include OAUTH2 authentication but for now
     // Accept either basic or none.  The default is none.
     this.authType = Strings.isNullOrEmpty(authType) ? "none" : authType;
     this.userName = userName;
     this.password = password;
     this.postBody = postBody;
+    this.params = params == null || params.isEmpty() ? null :
+      ImmutableList.copyOf(params);
+    this.dataPath = Strings.isNullOrEmpty(dataPath) ? null : dataPath;
+
+    // Default to true for backward compatibility with first PR.
+    this.requireTail = requireTail == null ? true : requireTail;
   }
 
   @JsonProperty("url")
@@ -122,26 +148,39 @@ public class HttpAPIConfig {
   @JsonProperty("postBody")
   public String postBody() { return postBody; }
 
+  @JsonProperty("params")
+  public List<String> params() { return params; }
+
+  @JsonProperty("dataPath")
+  public String dataPath() { return dataPath; }
+
+  @JsonProperty("requireTail")
+  public boolean requireTail() { return requireTail; }
+
   @JsonIgnore
-  public HttpMethods getMethodType() {
-    return HttpMethods.valueOf(this.method());
+  public HttpMethod getMethodType() {
+    return HttpMethod.valueOf(this.method());
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(url, method, headers, authType, userName, password, postBody);
+    return Objects.hash(url, method, requireTail, params, headers,
+        authType, userName, password, postBody);
   }
 
   @Override
   public String toString() {
     return new PlanStringBuilder(this)
       .field("url", url)
+      .field("require tail", requireTail)
       .field("method", method)
+      .field("dataPath", dataPath)
       .field("headers", headers)
       .field("authType", authType)
       .field("username", userName)
-      .field("password", password)
+      .maskedField("password", password)
       .field("postBody", postBody)
+      .field("filterFields", params)
       .toString();
   }
 
@@ -153,13 +192,16 @@ public class HttpAPIConfig {
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    HttpAPIConfig other = (HttpAPIConfig) obj;
+    HttpApiConfig other = (HttpApiConfig) obj;
     return Objects.equals(url, other.url)
       && Objects.equals(method, other.method)
       && Objects.equals(headers, other.headers)
       && Objects.equals(authType, other.authType)
       && Objects.equals(userName, other.userName)
       && Objects.equals(password, other.password)
-      && Objects.equals(postBody, other.postBody);
+      && Objects.equals(postBody, other.postBody)
+      && Objects.equals(params, other.params)
+      && Objects.equals(dataPath, other.dataPath)
+      && Objects.equals(requireTail, other.requireTail);
   }
 }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index 7aaa1e8..7f0efd9 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -18,12 +18,17 @@
 package org.apache.drill.exec.store.http;
 
 import java.io.File;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
 
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.ChildErrorContext;
 import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
-import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.store.easy.json.loader.JsonLoader;
 import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
 import org.apache.drill.exec.store.http.util.HttpProxyConfig;
@@ -32,45 +37,99 @@ import org.apache.drill.exec.store.http.util.SimpleHttp;
 
 import com.typesafe.config.Config;
 
+import okhttp3.HttpUrl;
+import okhttp3.HttpUrl.Builder;
+
 public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
-  private final HttpStoragePluginConfig config;
   private final HttpSubScan subScan;
   private JsonLoader jsonLoader;
 
-  public HttpBatchReader(HttpStoragePluginConfig config, HttpSubScan subScan) {
-    this.config = config;
+  public HttpBatchReader(HttpSubScan subScan) {
     this.subScan = subScan;
   }
 
   @Override
   public boolean open(SchemaNegotiator negotiator) {
-    CustomErrorContext errorContext = negotiator.parentErrorContext();
 
     // Result set loader setup
     String tempDirPath = negotiator
         .drillConfig()
         .getString(ExecConstants.DRILL_TMP_DIR);
-    ResultSetLoader loader = negotiator.build();
+
+    HttpUrl url = buildUrl();
+
+    CustomErrorContext errorContext = new ChildErrorContext(negotiator.parentErrorContext()) {
+      @Override
+      public void addContext(UserException.Builder builder) {
+        super.addContext(builder);
+        builder.addContext("URL", url.toString());
+      }
+    };
+    negotiator.setErrorContext(errorContext);
 
     // Http client setup
-    SimpleHttp http = new SimpleHttp(config, new File(tempDirPath), subScan.tableSpec().database(), proxySettings(negotiator.drillConfig()), errorContext);
+    SimpleHttp http = new SimpleHttp(
+        subScan, url,
+        new File(tempDirPath),
+        proxySettings(negotiator.drillConfig(), url),
+        errorContext);
 
     // JSON loader setup
-    jsonLoader = new JsonLoaderBuilder()
-        .resultSetLoader(loader)
-        .standardOptions(negotiator.queryOptions())
-        .errorContext(errorContext)
-        .fromStream(http.getInputStream(subScan.getFullURL()))
-        .build();
-
-    // Please read the first batch
-    return true;
+    InputStream inStream = http.getInputStream();
+    try {
+      jsonLoader = new JsonLoaderBuilder()
+          .resultSetLoader(negotiator.build())
+          .standardOptions(negotiator.queryOptions())
+          .dataPath(subScan.tableSpec().connectionConfig().dataPath())
+          .errorContext(errorContext)
+          .fromStream(inStream)
+          .build();
+    } catch (Throwable t) {
+
+      // Paranoia: ensure stream is closed if anything goes wrong.
+      // After this, the JSON loader will close the stream.
+      AutoCloseables.closeSilently(inStream);
+      throw t;
+    }
+
+    return true; // Please read the first batch
+  }
+
+  private HttpUrl buildUrl() {
+    HttpApiConfig apiConfig = subScan.tableSpec().connectionConfig();
+    String baseUrl = apiConfig.url();
+
+    // Append table name, if available.
+    if (subScan.tableSpec().tableName() != null) {
+      baseUrl += subScan.tableSpec().tableName();
+    }
+    HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl).newBuilder();
+    if (apiConfig.params() != null && !apiConfig.params().isEmpty() &&
+        subScan.filters() != null) {
+      addFilters(urlBuilder, apiConfig.params(), subScan.filters());
+    }
+    return urlBuilder.build();
+  }
+
+  /**
+   * Convert equality filter conditions into HTTP query parameters
+   * Parameters must appear in the order defined in the config.
+   */
+  private void addFilters(Builder urlBuilder, List<String> params,
+      Map<String, String> filters) {
+    for (String param : params) {
+      String value = filters.get(param);
+      if (value != null) {
+        urlBuilder.addQueryParameter(param, value);
+      }
+    }
   }
 
- private HttpProxyConfig proxySettings(Config drillConfig) {
-    ProxyBuilder builder = HttpProxyConfig.builder()
-        .fromConfigForURL(drillConfig, subScan.getFullURL());
-    String proxyType = config.proxyType();
+  private HttpProxyConfig proxySettings(Config drillConfig, HttpUrl url) {
+    final HttpStoragePluginConfig config = subScan.tableSpec().config();
+    final ProxyBuilder builder = HttpProxyConfig.builder()
+        .fromConfigForURL(drillConfig, url.toString());
+    final String proxyType = config.proxyType();
     if (proxyType != null && !"direct".equals(proxyType)) {
       builder
         .type(config.proxyType())
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
index 9e4e86d..ff0f7d6 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.http;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -31,10 +32,10 @@ import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
-import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 
@@ -43,58 +44,104 @@ public class HttpGroupScan extends AbstractGroupScan {
 
   private final List<SchemaPath> columns;
   private final HttpScanSpec httpScanSpec;
-  private final HttpStoragePluginConfig config;
+  private final Map<String, String> filters;
+  private final ScanStats scanStats;
+  private final double filterSelectivity;
 
-  public HttpGroupScan (
-    HttpStoragePluginConfig config,
-    HttpScanSpec scanSpec,
-    List<SchemaPath> columns) {
+  // Used only in planner, not serialized
+  private int hashCode;
+
+  /**
+   * Creates a new group scan from the storage plugin.
+   */
+  public HttpGroupScan (HttpScanSpec scanSpec) {
     super("no-user");
-    this.config = config;
     this.httpScanSpec = scanSpec;
-    this.columns = columns;
+    this.columns = ALL_COLUMNS;
+    this.filters = null;
+    this.filterSelectivity = 0.0;
+    this.scanStats = computeScanStats();
   }
 
+  /**
+   * Copies the group scan during many stages of Calcite operation.
+   */
   public HttpGroupScan(HttpGroupScan that) {
     super(that);
-    config = that.config();
-    httpScanSpec = that.httpScanSpec();
-    columns = that.getColumns();
+    this.httpScanSpec = that.httpScanSpec;
+    this.columns = that.columns;
+    this.filters = that.filters;
+    this.filterSelectivity = that.filterSelectivity;
+
+    // Calcite makes many copies in the later stage of planning
+    // without changing anything. Retain the previous stats.
+    this.scanStats = that.scanStats;
   }
 
+  /**
+   * Applies columns. Oddly called multiple times, even when
+   * the scan already has columns.
+   */
   public HttpGroupScan(HttpGroupScan that, List<SchemaPath> columns) {
-    super("no-user");
+    super(that);
     this.columns = columns;
-    this.config = that.config;
     this.httpScanSpec = that.httpScanSpec;
+
+    // Oddly called later in planning, after earlier assigning columns,
+    // to again assign columns. Retain filters, but compute new stats.
+    this.filters = that.filters;
+    this.filterSelectivity = that.filterSelectivity;
+    this.scanStats = computeScanStats();
+  }
+
+  /**
+   * Adds a filter to the scan.
+   */
+  public HttpGroupScan(HttpGroupScan that, Map<String, String> filters,
+      double filterSelectivity) {
+    super(that);
+    this.columns = that.columns;
+    this.httpScanSpec = that.httpScanSpec;
+
+    // Applies a filter.
+    this.filters = filters;
+    this.filterSelectivity = filterSelectivity;
+    this.scanStats = computeScanStats();
   }
 
+  /**
+   * Deserialize a group scan. Not called in normal operation. Probably used
+   * only if Drill executes a logical plan.
+   */
   @JsonCreator
   public HttpGroupScan(
-    @JsonProperty("config") HttpStoragePluginConfig config,
     @JsonProperty("columns") List<SchemaPath> columns,
-    @JsonProperty("httpScanSpec") HttpScanSpec httpScanSpec
+    @JsonProperty("httpScanSpec") HttpScanSpec httpScanSpec,
+    @JsonProperty("filters") Map<String, String> filters,
+    @JsonProperty("filterSelectivity") double selectivity
   ) {
     super("no-user");
-    this.config = config;
     this.columns = columns;
     this.httpScanSpec = httpScanSpec;
+    this.filters = filters;
+    this.filterSelectivity = selectivity;
+    this.scanStats = computeScanStats();
   }
 
-  @JsonProperty("config")
-  public HttpStoragePluginConfig config() { return config; }
-
   @JsonProperty("columns")
   public List<SchemaPath> columns() { return columns; }
 
   @JsonProperty("httpScanSpec")
   public HttpScanSpec httpScanSpec() { return httpScanSpec; }
 
+  @JsonProperty("filters")
+  public Map<String, String> filters() { return filters; }
+
+  @JsonProperty("filterSelectivity")
+  public double selectivity() { return filterSelectivity; }
+
   @Override
-  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
-    // No filter pushdowns yet, so this method does nothing
-    return;
-  }
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) { }
 
   @Override
   @JsonIgnore
@@ -107,9 +154,14 @@ public class HttpGroupScan extends AbstractGroupScan {
     return true;
   }
 
+  @JsonIgnore
+  public HttpApiConfig getHttpConfig() {
+    return httpScanSpec.connectionConfig();
+  }
+
   @Override
   public SubScan getSpecificScan(int minorFragmentId) {
-    return new HttpSubScan(config, httpScanSpec, columns);
+    return new HttpSubScan(httpScanSpec, columns, filters);
   }
 
   @Override
@@ -118,6 +170,7 @@ public class HttpGroupScan extends AbstractGroupScan {
   }
 
   @Override
+  @JsonIgnore
   public String getDigest() {
     return toString();
   }
@@ -130,26 +183,71 @@ public class HttpGroupScan extends AbstractGroupScan {
 
   @Override
   public ScanStats getScanStats() {
-    int estRowCount = 10_000;
-    int rowWidth = columns == null ? 200 : 100;
-    int estDataSize = estRowCount * 200 * rowWidth;
-    int estCpuCost = DrillCostBase.PROJECT_CPU_COST;
-    return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT,
-        estRowCount, estCpuCost, estDataSize);
+
+    // Since this class is immutable, compute stats once and cache
+    // them. If the scan changes (adding columns, adding filters), we
+    // get a new scan without cached stats.
+    return scanStats;
+  }
+
+  private ScanStats computeScanStats() {
+
+    // If this config allows filters, then make the default
+    // cost very high to force the planner to choose the version
+    // with filters.
+    if (allowsFilters() && !hasFilters()) {
+      return new ScanStats(ScanStats.GroupScanProperty.ESTIMATED_TOTAL_COST,
+          1E9, 1E112, 1E12);
+    }
+
+    // No good estimates at all, just make up something.
+    double estRowCount = 10_000;
+
+    // NOTE this was important! if the predicates don't make the query more
+    // efficient they won't get pushed down
+    if (hasFilters()) {
+      estRowCount *= filterSelectivity;
+    }
+
+    double estColCount = Utilities.isStarQuery(columns) ? DrillScanRel.STAR_COLUMN_COST : columns.size();
+    double valueCount = estRowCount * estColCount;
+    double cpuCost = valueCount;
+    double ioCost = valueCount;
+
+    // Force the caller to use our costs rather than the
+    // defaults (which sets IO cost to zero).
+    return new ScanStats(ScanStats.GroupScanProperty.ESTIMATED_TOTAL_COST,
+        estRowCount, cpuCost, ioCost);
+  }
+
+  @JsonIgnore
+  public boolean hasFilters() {
+    return filters != null;
   }
 
   @Override
   public String toString() {
     return new PlanStringBuilder(this)
-      .field("httpScanSpec", httpScanSpec)
+      .field("scan spec", httpScanSpec)
       .field("columns", columns)
-      .field("httpStoragePluginConfig", config)
+      .field("filters", filters)
       .toString();
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(httpScanSpec, columns, config);
+
+    // Hash code is cached since Calcite calls this method many times.
+    if (hashCode == 0) {
+      // Don't include cost; it is derived.
+      hashCode = Objects.hash(httpScanSpec, columns, filters);
+    }
+    return hashCode;
+  }
+
+  @JsonIgnore
+  public boolean allowsFilters() {
+    return getHttpConfig().params() != null;
   }
 
   @Override
@@ -160,9 +258,11 @@ public class HttpGroupScan extends AbstractGroupScan {
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
+
+    // Don't include cost; it is derived.
     HttpGroupScan other = (HttpGroupScan) obj;
     return Objects.equals(httpScanSpec, other.httpScanSpec())
-      && Objects.equals(columns, other.columns())
-      && Objects.equals(config, other.config());
+        && Objects.equals(columns, other.columns())
+        && Objects.equals(filters, other.filters());
   }
 }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPushDownListener.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPushDownListener.java
new file mode 100644
index 0000000..c7cd73a
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPushDownListener.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.http.filter.ExprNode;
+import org.apache.drill.exec.store.http.filter.ExprNode.AndNode;
+import org.apache.drill.exec.store.http.filter.ExprNode.ColRelOpConstNode;
+import org.apache.drill.exec.store.http.filter.ExprNode.OrNode;
+import org.apache.drill.exec.store.http.filter.FilterPushDownListener;
+import org.apache.drill.exec.store.http.filter.FilterPushDownStrategy;
+
+/**
+ * The HTTP storage plugin accepts filters which are:
+ * <ul>
+ * <li>A single column = value expression, where the column is
+ * a filter column from the config, or</li>
+ * <li>An AND'ed set of such expressions,</li>
+ * <li>If the value is one with an unambiguous conversion to
+ * a string. (That is, not dates, binary, maps, etc.)</li>
+ * </ul>
+ */
+public class HttpPushDownListener implements FilterPushDownListener {
+
+  public static Set<StoragePluginOptimizerRule> rulesFor(
+      OptimizerRulesContext optimizerRulesContext) {
+    return FilterPushDownStrategy.rulesFor(
+        new HttpPushDownListener());
+  }
+
+  @Override
+  public String prefix() {
+    return "Http";
+  }
+
+  @Override
+  public boolean isTargetScan(GroupScan groupScan) {
+    return groupScan instanceof HttpGroupScan;
+  }
+
+  @Override
+  public ScanPushDownListener builderFor(GroupScan groupScan) {
+    HttpGroupScan httpScan = (HttpGroupScan) groupScan;
+    if (httpScan.hasFilters() || !httpScan.allowsFilters()) {
+      return null;
+    } else {
+      return new HttpScanPushDownListener(httpScan);
+    }
+  }
+
+  private static class HttpScanPushDownListener implements ScanPushDownListener {
+
+    private final HttpGroupScan groupScan;
+    private final Map<String, String> filterParams = CaseInsensitiveMap.newHashMap();
+
+    HttpScanPushDownListener(HttpGroupScan groupScan) {
+      this.groupScan = groupScan;
+      for (String field : groupScan.getHttpConfig().params()) {
+        filterParams.put(field, field);
+      }
+    }
+
+    @Override
+    public ExprNode accept(ExprNode node) {
+      if (node instanceof OrNode) {
+        return null;
+      } else if (node instanceof ColRelOpConstNode) {
+        return acceptRelOp((ColRelOpConstNode) node);
+      } else {
+        return null;
+      }
+    }
+
+    /**
+     * Only accept equality conditions.
+     */
+    private ColRelOpConstNode acceptRelOp(ColRelOpConstNode relOp) {
+      switch (relOp.op) {
+      case EQ:
+        return acceptColumn(relOp.colName) &&
+               acceptType(relOp.value.type) ? relOp : null;
+      default:
+        return null;
+      }
+    }
+
+    /**
+     * Only accept columns in the filter params list.
+     */
+    private boolean acceptColumn(String colName) {
+      return filterParams.containsKey(colName);
+    }
+
+    /**
+     * Only accept types which have an unambiguous mapping to
+     * String.
+     */
+    private boolean acceptType(MinorType type) {
+      switch (type) {
+      case BIGINT:
+      case BIT:
+      case FLOAT4:
+      case FLOAT8:
+      case INT:
+      case SMALLINT:
+      case VARCHAR:
+      case VARDECIMAL:
+        return true;
+      default:
+        return false;
+      }
+    }
+
+    /**
+     * Convert the equality nodes to a map of param/string pairs using
+     * the case specified in the storage plugin config.
+     */
+    @Override
+    public Pair<GroupScan, List<RexNode>> transform(AndNode andNode) {
+      Map<String, String> filters = new HashMap<>();
+      double selectivity = 1;
+      for (ExprNode expr : andNode.children) {
+        ColRelOpConstNode relOp = (ColRelOpConstNode) expr;
+        filters.put(filterParams.get(relOp.colName), relOp.value.value.toString());
+        selectivity *= relOp.op.selectivity();
+      }
+      HttpGroupScan newScan = new HttpGroupScan(groupScan, filters, selectivity);
+      return Pair.of(newScan, Collections.emptyList());
+    }
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
index 46d9838..effe0d3 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
@@ -56,7 +56,6 @@ public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> {
 
   private ScanFrameworkBuilder createBuilder(OptionManager options,
       HttpSubScan subScan) {
-    HttpStoragePluginConfig config = subScan.config();
     ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
     builder.projection(subScan.columns());
     builder.setUserName(subScan.getUserName());
@@ -66,12 +65,13 @@ public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> {
         new ChildErrorContext(builder.errorContext()) {
           @Override
           public void addContext(UserException.Builder builder) {
-            builder.addContext("URL", subScan.getFullURL());
+            builder.addContext("Connection", subScan.tableSpec().connection());
+            builder.addContext("Plugin", subScan.tableSpec().pluginName());
           }
         });
 
     // Reader
-    ReaderFactory readerFactory = new HttpReaderFactory(config, subScan);
+    ReaderFactory readerFactory = new HttpReaderFactory(subScan);
     builder.setReaderFactory(readerFactory);
     builder.nullType(Types.optional(MinorType.VARCHAR));
     return builder;
@@ -79,12 +79,10 @@ public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> {
 
   private static class HttpReaderFactory implements ReaderFactory {
 
-    private final HttpStoragePluginConfig config;
     private final HttpSubScan subScan;
     private int count;
 
-    public HttpReaderFactory(HttpStoragePluginConfig config, HttpSubScan subScan) {
-      this.config = config;
+    public HttpReaderFactory(HttpSubScan subScan) {
       this.subScan = subScan;
     }
 
@@ -96,7 +94,7 @@ public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> {
 
       // Only a single scan (in a single thread)
       if (count++ == 0) {
-        return new HttpBatchReader(config, subScan);
+        return new HttpBatchReader(subScan);
       } else {
         return null;
       }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java
index 19921bc..f2cab11 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java
@@ -26,28 +26,30 @@ import org.apache.drill.common.PlanStringBuilder;
 @JsonTypeName("http-scan-spec")
 public class HttpScanSpec {
 
-  protected final String schemaName;
-
-  protected final String database;
-
-  protected final String tableName;
-
-  protected final HttpStoragePluginConfig config;
+  private final String pluginName;
+  private final String connectionName;
+  private final String tableName;
+  private final HttpStoragePluginConfig config;
 
   @JsonCreator
-  public HttpScanSpec(@JsonProperty("schemaName") String schemaName,
-                      @JsonProperty("database") String database,
+  public HttpScanSpec(@JsonProperty("pluginName") String pluginName,
+                      @JsonProperty("connection") String connectionName,
                       @JsonProperty("tableName") String tableName,
                       @JsonProperty("config") HttpStoragePluginConfig config) {
-    this.schemaName = schemaName;
-    this.database = database;
+    this.pluginName = pluginName;
+    this.connectionName = connectionName;
     this.tableName = tableName;
     this.config = config;
   }
 
-  @JsonProperty("database")
-  public String database() {
-    return database;
+  @JsonProperty("pluginName")
+  public String pluginName() {
+    return pluginName;
+  }
+
+  @JsonProperty("connection")
+  public String connection() {
+    return connectionName;
   }
 
   @JsonProperty("tableName")
@@ -55,16 +57,26 @@ public class HttpScanSpec {
     return tableName;
   }
 
+  @JsonProperty("config")
+  public HttpStoragePluginConfig config() {
+    return config;
+  }
+
   @JsonIgnore
   public String getURL() {
-    return database;
+    return connectionName;
+  }
+
+  @JsonIgnore
+  public HttpApiConfig connectionConfig() {
+    return config.getConnection(connectionName);
   }
 
   @Override
   public String toString() {
     return new PlanStringBuilder(this)
-      .field("schemaName", schemaName)
-      .field("database", database)
+      .field("schemaName", pluginName)
+      .field("database", connectionName)
       .field("tableName", tableName)
       .field("config", config)
       .toString();
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java
index c111d3d..7610806 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSchemaFactory.java
@@ -18,12 +18,14 @@
 package org.apache.drill.exec.store.http;
 
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
+import java.util.Map.Entry;
 
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
@@ -35,67 +37,87 @@ public class HttpSchemaFactory extends AbstractSchemaFactory {
 
   private final HttpStoragePlugin plugin;
 
-  public HttpSchemaFactory(HttpStoragePlugin plugin, String schemaName) {
-    super(schemaName);
+  public HttpSchemaFactory(HttpStoragePlugin plugin) {
+    super(plugin.getName());
     this.plugin = plugin;
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
-    HttpSchema schema = new HttpSchema(getName());
+    HttpSchema schema = new HttpSchema(plugin);
     logger.debug("Registering {} {}", schema.getName(), schema.toString());
 
     SchemaPlus schemaPlus = parent.add(getName(), schema);
     schema.setHolder(schemaPlus);
   }
 
-  class HttpSchema extends AbstractSchema {
+  protected static class HttpSchema extends AbstractSchema {
 
-    public HttpSchema(String name) {
-      super(Collections.emptyList(), name);
-    }
+    private final HttpStoragePlugin plugin;
+    private final Map<String, HttpAPIConnectionSchema> subSchemas = CaseInsensitiveMap.newHashMap();
+    private final Map<String, HttpApiConfig> tables = CaseInsensitiveMap.newHashMap();
+    private final Map<String, DynamicDrillTable> activeTables = CaseInsensitiveMap.newHashMap();
 
-    void setHolder(SchemaPlus plusOfThis) {
-      for (String s : getSubSchemaNames()) {
-        plusOfThis.add(s, getSubSchemaKnownExists(s));
+    public HttpSchema(HttpStoragePlugin plugin) {
+      super(Collections.emptyList(), plugin.getName());
+      this.plugin = plugin;
+      for (Entry<String, HttpApiConfig> entry : plugin.getConfig().connections().entrySet()) {
+        String configName = entry.getKey();
+        HttpApiConfig config = entry.getValue();
+        if (config.requireTail()) {
+          subSchemas.put(configName, new HttpAPIConnectionSchema(this, configName, plugin));
+        } else {
+          tables.put(configName, config);
+        }
       }
     }
 
-    @Override
-    public Set<String> getSubSchemaNames() {
-      HttpStoragePluginConfig config = plugin.getConfig();
-      Map<String, HttpAPIConfig> connections = config.connections();
-      Set<String> subSchemaNames = new HashSet<>();
-
-      // Get the possible subschemas.
-      for (Map.Entry<String, HttpAPIConfig> entry : connections.entrySet()) {
-        subSchemaNames.add(entry.getKey());
+    void setHolder(SchemaPlus plusOfThis) {
+      for (Entry<String, HttpAPIConnectionSchema> entry : subSchemas.entrySet()) {
+        plusOfThis.add(entry.getKey(), entry.getValue());
       }
-      return subSchemaNames;
     }
 
     @Override
     public AbstractSchema getSubSchema(String name) {
-      if (plugin.getConfig().connections().containsKey(name)) {
-        return getSubSchemaKnownExists(name);
+      HttpAPIConnectionSchema subSchema = subSchemas.get(name);
+      if (subSchema != null) {
+        return subSchema;
+      } else if (tables.containsKey(name)) {
+        return null;
       } else {
         throw UserException
           .connectionError()
-          .message("API '{}' does not exist in HTTP Storage plugin '{}'", name, getName())
+          .message("API '%s' does not exist in HTTP storage plugin '%s'", name, getName())
           .build(logger);
       }
     }
 
-    /**
-     * Helper method to get subschema when we know it exists (already checked the existence)
-     */
-    private HttpAPIConnectionSchema getSubSchemaKnownExists(String name) {
-      return new HttpAPIConnectionSchema(this, name, plugin);
+    @Override
+    public Table getTable(String name) {
+      DynamicDrillTable table = activeTables.get(name);
+      if (table != null) {
+        return table;
+      }
+      HttpApiConfig config = tables.get(name);
+      if (config != null) {
+        // Register a new table
+        return registerTable(name, new DynamicDrillTable(plugin, plugin.getName(),
+            new HttpScanSpec(plugin.getName(), name, null,
+                plugin.getConfig().copyForPlan(name))));
+      } else {
+        return null; // Unknown table
+      }
     }
 
     @Override
     public String getTypeName() {
       return HttpStoragePluginConfig.NAME;
     }
+
+    private DynamicDrillTable registerTable(String name, DynamicDrillTable table) {
+      activeTables.put(name, table);
+      return table;
+    }
   }
 }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
index c660a2e..fde9623 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
@@ -17,14 +17,21 @@
  */
 package org.apache.drill.exec.store.http;
 
+import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.JSONOptions;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.PlannerPhase;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.http.filter.FilterPushDownUtils;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
 import com.fasterxml.jackson.core.type.TypeReference;
 import java.io.IOException;
+import java.util.Set;
 
 public class HttpStoragePlugin extends AbstractStoragePlugin {
 
@@ -35,7 +42,7 @@ public class HttpStoragePlugin extends AbstractStoragePlugin {
   public HttpStoragePlugin(HttpStoragePluginConfig configuration, DrillbitContext context, String name) {
     super(context, name);
     this.config = configuration;
-    this.schemaFactory = new HttpSchemaFactory(this, name);
+    this.schemaFactory = new HttpSchemaFactory(this);
   }
 
   @Override
@@ -56,6 +63,20 @@ public class HttpStoragePlugin extends AbstractStoragePlugin {
   @Override
   public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
     HttpScanSpec scanSpec = selection.getListWith(context.getLpPersistence().getMapper(), new TypeReference<HttpScanSpec>() {});
-    return new HttpGroupScan(config, scanSpec, null);
+    return new HttpGroupScan(scanSpec);
   }
-}
+
+  @Override
+  public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
+
+    // Push-down planning is done at the logical phase so it can
+    // influence parallelization in the physical phase. Note that many
+    // existing plugins perform filter push-down at the physical
+    // phase, which also works fine if push-down is independent of
+    // parallelization.
+    if (FilterPushDownUtils.isFilterPushDownPhase(phase)) {
+      return HttpPushDownListener.rulesFor(optimizerContext);
+    } else {
+      return ImmutableSet.of();
+    }
+  }}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
index 2b65bb9..620fc9f 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
@@ -23,11 +23,13 @@ import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.logical.StoragePluginConfigBase;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
@@ -38,7 +40,7 @@ public class HttpStoragePluginConfig extends StoragePluginConfigBase {
 
   public static final String NAME = "http";
 
-  public final Map<String, HttpAPIConfig> connections;
+  public final Map<String, HttpApiConfig> connections;
 
   public final boolean cacheResults;
 
@@ -59,7 +61,7 @@ public class HttpStoragePluginConfig extends StoragePluginConfigBase {
 
   @JsonCreator
   public HttpStoragePluginConfig(@JsonProperty("cacheResults") Boolean cacheResults,
-                                 @JsonProperty("connections") Map<String, HttpAPIConfig> connections,
+                                 @JsonProperty("connections") Map<String, HttpApiConfig> connections,
                                  @JsonProperty("timeout") Integer timeout,
                                  @JsonProperty("proxyHost") String proxyHost,
                                  @JsonProperty("proxyPort") Integer proxyPort,
@@ -107,6 +109,22 @@ public class HttpStoragePluginConfig extends StoragePluginConfigBase {
     return value.isEmpty() ? null : value;
   }
 
+  /**
+   * Create a copy of the plugin config with only the indicated connection.
+   * The copy is used in the query plan to avoid including unnecessary information.
+   */
+  public HttpStoragePluginConfig copyForPlan(String connectionName) {
+    return new HttpStoragePluginConfig(
+        cacheResults, configFor(connectionName), timeout,
+        proxyHost, proxyPort, proxyType, proxyUsername, proxyPassword);
+  }
+
+  private Map<String, HttpApiConfig> configFor(String connectionName) {
+    Map<String, HttpApiConfig> single = new HashMap<>();
+    single.put(connectionName, getConnection(connectionName));
+    return single;
+  }
+
   @Override
   public boolean equals(Object that) {
     if (this == that) {
@@ -133,7 +151,7 @@ public class HttpStoragePluginConfig extends StoragePluginConfigBase {
       .field("proxyHost", proxyHost)
       .field("proxyPort", proxyPort)
       .field("proxyUsername", proxyUsername)
-      .field("proxyPassword", proxyPassword)
+      .maskedField("proxyPassword", proxyPassword)
       .field("proxyType", proxyType)
       .toString();
   }
@@ -148,7 +166,7 @@ public class HttpStoragePluginConfig extends StoragePluginConfigBase {
   public boolean cacheResults() { return cacheResults; }
 
   @JsonProperty("connections")
-  public Map<String, HttpAPIConfig> connections() { return connections; }
+  public Map<String, HttpApiConfig> connections() { return connections; }
 
   @JsonProperty("timeout")
   public int timeout() { return timeout;}
@@ -167,4 +185,9 @@ public class HttpStoragePluginConfig extends StoragePluginConfigBase {
 
   @JsonProperty("proxyType")
   public String proxyType() { return proxyType; }
+
+  @JsonIgnore
+  public HttpApiConfig getConnection(String connectionName) {
+    return connections.get(connectionName);
+  }
 }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
index 700ad70..1706c7c 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.http;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -38,19 +39,20 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 public class HttpSubScan extends AbstractBase implements SubScan {
 
   private final HttpScanSpec tableSpec;
-  private final HttpStoragePluginConfig config;
   private final List<SchemaPath> columns;
+  private final Map<String, String> filters;
 
   @JsonCreator
   public HttpSubScan(
-    @JsonProperty("config") HttpStoragePluginConfig config,
     @JsonProperty("tableSpec") HttpScanSpec tableSpec,
-    @JsonProperty("columns") List<SchemaPath> columns) {
+    @JsonProperty("columns") List<SchemaPath> columns,
+    @JsonProperty("filters") Map<String, String> filters) {
     super("user-if-needed");
-    this.config = config;
     this.tableSpec = tableSpec;
     this.columns = columns;
+    this.filters = filters;
   }
+
   @JsonProperty("tableSpec")
   public HttpScanSpec tableSpec() {
     return tableSpec;
@@ -61,21 +63,9 @@ public class HttpSubScan extends AbstractBase implements SubScan {
     return columns;
   }
 
-  @JsonProperty("config")
-  public HttpStoragePluginConfig config() {
-    return config;
-  }
-
-  @JsonIgnore
-  public String getURL() {
-    return tableSpec.getURL();
-  }
-
-  @JsonIgnore
-  public String getFullURL() {
-    String selectedConnection = tableSpec.database();
-    String url = config.connections().get(selectedConnection).url();
-    return url + tableSpec.tableName();
+  @JsonProperty("filters")
+  public Map<String, String> filters() {
+    return filters;
   }
 
  @Override
@@ -86,7 +76,7 @@ public class HttpSubScan extends AbstractBase implements SubScan {
 
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
-    return new HttpSubScan(config, tableSpec, columns);
+    return new HttpSubScan(tableSpec, columns, filters);
   }
 
   @Override
@@ -105,13 +95,13 @@ public class HttpSubScan extends AbstractBase implements SubScan {
     return new PlanStringBuilder(this)
       .field("tableSpec", tableSpec)
       .field("columns", columns)
-      .field("config", config)
+      .field("filters", filters)
       .toString();
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(tableSpec,columns,config);
+    return Objects.hash(tableSpec, columns, filters);
   }
 
   @Override
@@ -125,6 +115,6 @@ public class HttpSubScan extends AbstractBase implements SubScan {
     HttpSubScan other = (HttpSubScan) obj;
     return Objects.equals(tableSpec, other.tableSpec)
       && Objects.equals(columns, other.columns)
-      && Objects.equals(config, other.config);
+      && Objects.equals(filters, other.filters);
   }
 }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/filter/ConstantHolder.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/filter/ConstantHolder.java
new file mode 100644
index 0000000..4350868
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/filter/ConstantHolder.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http.filter;
+
+import java.math.BigDecimal;
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+
+/**
+ * Description of a constant argument of an expression.
+ * Each constant is a (type, value) pair.
+ */
+
+@JsonPropertyOrder({"type", "value"})
+public class ConstantHolder implements Comparable<ConstantHolder> {
+  @JsonProperty("type")
+  public final MinorType type;
+  @JsonProperty("value")
+  public final Object value;
+
+  @JsonCreator
+  public ConstantHolder(
+      @JsonProperty("type") MinorType type,
+      @JsonProperty("value") Object value) {
+    this.type = type;
+    this.value = value;
+  }
+
+  public static ConstantHolder bitValue(boolean value) {
+    return new ConstantHolder(MinorType.BIT, value);
+  }
+
+  public static ConstantHolder smallIntValue(int value) {
+    return new ConstantHolder(MinorType.SMALLINT, (short) value);
+  }
+
+  public static ConstantHolder intValue(int value) {
+    return new ConstantHolder(MinorType.INT, value);
+  }
+
+  public static ConstantHolder bigIntValue(long value) {
+    return new ConstantHolder(MinorType.BIGINT, value);
+  }
+
+  public static ConstantHolder float4Value(float value) {
+    return new ConstantHolder(MinorType.FLOAT4, value);
+  }
+
+  public static ConstantHolder float8Value(double value) {
+    return new ConstantHolder(MinorType.FLOAT8, value);
+  }
+
+  public static ConstantHolder decimalValue(BigDecimal value) {
+    return new ConstantHolder(MinorType.VARDECIMAL, value);
+  }
+
+  public static ConstantHolder varcharValue(String value) {
+    return new ConstantHolder(MinorType.VARCHAR, value);
+  }
+
+  /**
+   * Convert a constant to the given type. Conversion is defined only for
+   * some types (where conversion makes sense) and only for some values
+   * (only those that would result in a valid conversion.)
+   *
+   * @param toType the target type
+   * @return a constant of the requested type
+   * @throws RuntimeException if the conversion is not legal
+   * @see {@link #normalize(MinorType)} for a "save" version of this
+   * method
+   */
+  public ConstantHolder convertTo(MinorType toType) {
+    if (type == toType) {
+      return this;
+    }
+    switch (toType) {
+    case INT:
+      return toInt();
+    case BIGINT:
+      return toBigInt();
+    case TIMESTAMP:
+      return toTimestamp(null);
+    case VARCHAR:
+      return toVarChar();
+    case FLOAT4:
+      return toFloat();
+    case FLOAT8:
+      return toDouble();
+    case VARDECIMAL:
+      return toDecimal();
+    default:
+      throw conversionError(toType);
+    }
+  }
+
+  /**
+   * Normalize the constant to the given type. Return null if the constant
+   * cannot be converted. Use this test to determine if the constant is of
+   * a form that can be pushed down when push-down only supports certain
+   * types. In such a case, the query will likely fail at execution time
+   * when Drill tries to compare the remaining filter with an incompatible
+   * column type.
+   *
+   * @param toType the target type
+   * @return a constant of the requested type or null if the conversion
+   * is not defined or is not legal for the given value
+   */
+  public ConstantHolder normalize(MinorType toType) {
+    try {
+      return convertTo(toType);
+    } catch (Throwable e) {
+      return null;
+    }
+  }
+
+  public ConstantHolder toInt() {
+    int intValue;
+    switch (type) {
+      case SMALLINT:
+        intValue = (Short) value;
+        break;
+      case INT:
+        return this;
+      case BIGINT: {
+        long value = (long) this.value;
+        if (value < Integer.MIN_VALUE || value > Integer.MAX_VALUE) {
+          throw conversionError(MinorType.INT);
+        }
+        intValue = (int) value;
+        break;
+      }
+      case VARCHAR:
+        try {
+          intValue = Integer.parseInt((String) value);
+        } catch (NumberFormatException e) {
+          throw conversionError(MinorType.INT);
+        }
+        break;
+      case VARDECIMAL:
+        try {
+          intValue = ((BigDecimal) value).intValueExact();
+        } catch (NumberFormatException e) {
+          throw conversionError(MinorType.INT);
+        }
+        break;
+      default:
+        throw conversionError(MinorType.INT);
+    }
+    return new ConstantHolder(MinorType.INT, intValue);
+  }
+
+  public ConstantHolder toBigInt() {
+    long longValue;
+    switch (type) {
+      case SMALLINT:
+        longValue = (Short) value;
+        break;
+      case INT:
+        longValue = (Integer) value;
+        break;
+      case BIGINT:
+        return this;
+      case VARCHAR:
+        try {
+          longValue = Long.parseLong((String) value);
+        } catch (NumberFormatException e) {
+          throw conversionError(MinorType.BIGINT);
+        }
+        break;
+      case VARDECIMAL:
+        try {
+          longValue = ((BigDecimal) value).longValueExact();
+        } catch (NumberFormatException e) {
+          throw conversionError(MinorType.INT);
+        }
+        break;
+      default:
+        throw conversionError(MinorType.BIGINT);
+    }
+    return new ConstantHolder(MinorType.BIGINT, longValue);
+  }
+
+  public ConstantHolder toTimestamp(String tz) {
+    long longValue;
+    switch (type) {
+      case TIMESTAMP:
+        return this;
+      case INT:
+        longValue = (Integer) value;
+        break;
+      case BIGINT:
+        longValue = (Long) value;
+        break;
+      case VARCHAR: {
+        DateTimeFormatter format = ISODateTimeFormat.dateTimeNoMillis();
+        if (tz != null) {
+          format = format.withZone(DateTimeZone.forID(tz));
+        }
+        try {
+          longValue = format.parseDateTime((String) value).getMillis();
+        } catch (Exception e) {
+          throw conversionError(MinorType.TIMESTAMP);
+        }
+        break;
+      }
+      default:
+        throw conversionError(MinorType.TIMESTAMP);
+    }
+    return new ConstantHolder(MinorType.TIMESTAMP, longValue);
+  }
+
+  /**
+   * Convert the value to a String. Consider this as a debug tool as
+   * no attempt is made to format values in any particular way. Date, time,
+   * interval and bit values will appear as numbers, which is probably
+   * not what most target systems expect.
+   * @return the value as a string using the {@code toString()} method
+   * on the value
+   */
+  public ConstantHolder toVarChar() {
+    if (type == MinorType.VARCHAR) {
+      return this;
+    } else {
+      return new ConstantHolder(MinorType.VARCHAR, value.toString());
+    }
+  }
+
+  public ConstantHolder toFloat() {
+    float floatValue;
+    switch (type) {
+    case BIGINT:
+      floatValue = (Long) value;
+      break;
+    case INT:
+      floatValue = (Integer) value;
+      break;
+    case FLOAT4:
+      return this;
+    case VARCHAR:
+      try {
+        floatValue = Float.parseFloat((String) value);
+      } catch (Exception e) {
+        throw conversionError(MinorType.FLOAT8);
+      }
+      break;
+    case VARDECIMAL:
+      floatValue = ((BigDecimal) value).floatValue();
+      break;
+    default:
+      throw conversionError(MinorType.FLOAT4);
+    }
+    return new ConstantHolder(MinorType.FLOAT4, floatValue);
+  }
+
+  public ConstantHolder toDouble() {
+    double doubleValue;
+    switch (type) {
+    case SMALLINT:
+      doubleValue = (Short) value;
+      break;
+    case INT:
+      doubleValue = (Integer) value;
+      break;
+    case BIGINT:
+      doubleValue = (Long) value;
+      break;
+    case FLOAT4:
+      doubleValue = (Float) value;
+      break;
+    case FLOAT8:
+      return this;
+    case VARCHAR:
+      try {
+        doubleValue = Double.parseDouble((String) value);
+      } catch (Exception e) {
+        throw conversionError(MinorType.FLOAT8);
+      }
+      break;
+    case VARDECIMAL:
+      doubleValue = ((BigDecimal) value).doubleValue();
+      break;
+    default:
+      throw conversionError(MinorType.FLOAT8);
+    }
+    return new ConstantHolder(MinorType.FLOAT8, doubleValue);
+  }
+
+  public ConstantHolder toDecimal() {
+    BigDecimal decimalValue;
+    switch (type) {
+    case SMALLINT:
+      decimalValue = BigDecimal.valueOf((Short) value);
+      break;
+    case INT:
+      decimalValue = BigDecimal.valueOf((Integer) value);
+      break;
+    case BIGINT:
+      decimalValue = BigDecimal.valueOf((Long) value);
+      break;
+    case FLOAT4:
+      decimalValue = BigDecimal.valueOf((Float) value);
+      break;
+    case FLOAT8:
+      decimalValue = BigDecimal.valueOf((Double) value);
+      break;
+    case VARCHAR:
+      try {
+        decimalValue = new BigDecimal((String) value);
+      } catch (Exception e) {
+        throw conversionError(MinorType.VARDECIMAL);
+      }
+      break;
+    case VARDECIMAL:
+      return this;
+    default:
+      throw conversionError(MinorType.VARDECIMAL);
+    }
+    return new ConstantHolder(MinorType.VARDECIMAL, decimalValue);
+  }
+
+  public RuntimeException conversionError(MinorType toType) {
+    return new IllegalStateException(String.format(
+        "Cannot convert a constant %s of type %s to type %s",
+        value.toString(), type.name(), toType.name()));
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder("Constant")
+      .field("type", type.name())
+      .field("value", value)
+      .toString();
+  }
+
+  @Override
+  public int compareTo(ConstantHolder other) {
+    Preconditions.checkArgument(type == other.type);
+    switch (type) {
+    case BIGINT:
+      return Long.compare((Long) value, (Long) other.value);
+    case BIT:
+      return Boolean.compare((Boolean) value, (Boolean) other.value);
+    case FLOAT4:
+      return Float.compare((Float) value, (Float) other.value);
+    case FLOAT8:
+      return Double.compare((Double) value, (Double) other.value);
+    case INT:
+      return Integer.compare((Integer) value, (Integer) other.value);
+    case VARCHAR:
+      return ((String) value).compareTo((String) other.value);
+    case VARDECIMAL:
+      return ((BigDecimal) value).compareTo((BigDecimal) other.value);
+    default:
+      throw new UnsupportedOperationException(
+          String.format(
+              "Unsupported comparison between types %s and %s. Convert values first.",
+              type.name(), other.type.name()));
+    }
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/filter/ExprNode.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/filter/ExprNode.java
new file mode 100644
index 0000000..e841077
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/filter/ExprNode.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http.filter;
+
+import java.util.List;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+
+/**
+ * Condensed form of a Drill WHERE clause expression
+ * node. Models only those nodes that are typically used
+ * in rewrite-style filter push-down. Not intended for
+ * the more complex interpreted form of filter push-down
+ * such as that needed for directory-based partitions.
+ * <p>
+ * Each node references the source Calcite node as well as
+ * the selectivity that Calcite attaches to the node. Any
+ * expressions pushed to a scan must reduce the scan
+ * cost by the amount of the selectivity, else Calcite will
+ * conclude that the orginal plan (without push-down) is
+ * cheaper.
+ */
+public abstract class ExprNode {
+
+  @JsonIgnore
+  private RexNode rexNode;
+
+  public ExprNode() {
+    rexNode = null;
+  }
+
+  public void tag(RexNode rexNode) {
+    this.rexNode = rexNode;
+  }
+
+  public RexNode rexNode() { return rexNode; }
+
+  public abstract double selectivity();
+
+  public interface ColumnTypeNode {
+    String colName();
+    MinorType type();
+  }
+
+  /**
+   * An expression node with an unlimited set of children.
+   */
+  public abstract static class ListNode extends ExprNode {
+    @JsonProperty("children")
+    public final List<ExprNode> children;
+
+    public ListNode(List<ExprNode> children) {
+      this.children = children;
+    }
+  }
+
+  /**
+   * Represents a set of AND'ed expressions in Conjunctive Normal
+   * Form (CNF). Typically the WHERE clause is rewritten to gather
+   * all expressions into a single CNF node. The children are often
+   * called "conjuncts."
+   */
+  public static class AndNode extends ListNode {
+
+    @JsonCreator
+    public AndNode(@JsonProperty("children") List<ExprNode> children) {
+      super(children);
+    }
+
+    @Override
+    public double selectivity() {
+      double selectivity = 1;
+      for (ExprNode child : children) {
+        selectivity *= child.selectivity();
+      }
+      return selectivity;
+    }
+  }
+
+  /**
+   * Represents a set of OR'ed expressions in Disjunctive Normal
+   * Form (CNF). Or'ed expresssions often cannot be pushed down except in the
+   * special case of a series of expressions on the same column, which can
+   * sometimes be handled as a series of requests, each for a different disjunct.
+   *
+   */
+  public static class OrNode extends ListNode {
+
+    @JsonCreator
+    public OrNode(@JsonProperty("children") List<ExprNode> children) {
+      super(children);
+    }
+
+    /**
+     * Compute the selectivity of this DNF (OR) clause. Drill assumes
+     * the selectivity of = is 0.15. An OR is a series of equal statements,
+     * so selectivity is n * 0.15. However, limit total selectivity to
+     * 0.9 (that is, if there are more than 6 clauses in the DNF, assume
+     * at least some reduction.)
+     *
+     * @return the estimated selectivity of this DNF clause
+     */
+    @Override
+    public double selectivity() {
+      if (children.size() == 0) {
+        return 1.0;
+      }
+      double selectivity = 0;
+      for (ExprNode child : children) {
+        selectivity += child.selectivity();
+      }
+      return Math.min(0.9, selectivity);
+    }
+  }
+
+  public abstract static class RelOpNode extends ExprNode {
+    @JsonProperty("op")
+    public final RelOp op;
+
+    public RelOpNode(RelOp op) {
+      this.op = op;
+    }
+
+    @Override
+    public double selectivity() {
+      return op.selectivity();
+    }
+  }
+
+  /**
+   * Semanticized form of a Calcite relational operator. Abstracts
+   * out the Drill implementation details to capture just the
+   * column name, operator and value. Supports only expressions
+   * of the form:<br>
+   * {@code <column> <relop> <const>}<br>
+   * Where the column is a simple name (not an array or map reference),
+   * the relop is one of a defined set, and the constant is one
+   * of the defined Drill types.
+   * <p>
+   * (The driver will convert expressions of the form:<br>
+   * {@code <const> <relop> <column>}<br>
+   * into the normalized form represented here.
+   */
+  @JsonInclude(Include.NON_NULL)
+  @JsonPropertyOrder({"colName", "op", "value"})
+  public static class ColRelOpConstNode extends RelOpNode {
+    @JsonProperty("colName")
+    public final String colName;
+    @JsonProperty("value")
+    public final ConstantHolder value;
+
+    @JsonCreator
+    public ColRelOpConstNode(
+        @JsonProperty("colName") String colName,
+        @JsonProperty("op") RelOp op,
+        @JsonProperty("value") ConstantHolder value) {
+      super(op);
+      Preconditions.checkArgument(op.argCount() == 1 || value != null);
+      this.colName = colName;
+      this.value = value;
+    }
+
+    /**
+     * Rewrite the RelOp with a normalized value.
+     *
+     * @param from the original RelOp
+     * @param value the new value with a different type and matching
+     * value
+     */
+    public ColRelOpConstNode(ColRelOpConstNode from, ConstantHolder value) {
+      super(from.op);
+      Preconditions.checkArgument(from.op.argCount() == 2);
+      this.colName = from.colName;
+      this.value = value;
+    }
+
+    /**
+     * Rewrite a relop using the given normalized value.
+     *
+     * @return a new RelOp with the normalized value. Will be the same relop
+     * if the normalized value is the same as the unnormalized value.
+     */
+    public ColRelOpConstNode normalize(ConstantHolder normalizedValue) {
+      if (value == normalizedValue) {
+        return this;
+      }
+      return new ColRelOpConstNode(this, normalizedValue);
+    }
+
+    public ColRelOpConstNode rewrite(String newName, ConstantHolder newValue) {
+      if (value == newValue && colName.equals(newName)) {
+        return this;
+      }
+      return new ColRelOpConstNode(newName, op, newValue);
+    }
+
+    @Override
+    public String toString() {
+      PlanStringBuilder builder = new PlanStringBuilder(this)
+        .field("op", op.name())
+        .field("colName", colName);
+      if (value != null) {
+        builder.field("type", value.type.name())
+               .field("value", value.value);
+      }
+      return builder.toString();
+    }
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/filter/FilterPushDownListener.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/filter/FilterPushDownListener.java
new file mode 100644
index 0000000..d971a86
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/filter/FilterPushDownListener.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http.filter;
+
+import java.util.List;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.store.http.filter.ExprNode.AndNode;
+
+/**
+ * Call-back (listener) implementation for a push-down filter.
+ * Abstracts away the common work; plugins implement this class
+ * to do work specific to the plugin.
+ * <p>
+ * Supports two kinds of filter push down:
+ * <dl>
+ * <dt>Conjunctive Normal Form (CNF)</dt>
+ * <dd>A series of expressions joined by an AND: the scan should
+ * produce only rows that satisfy all the conditions.</dd>
+ * <dt>Disjunctive Normal Form (DNF)</dt>
+ * <dd>A series of alternative values for a single column, essentially
+ * a set of expressions joined by OR. The scan spits into multiple
+ * scans, each scanning one of the partitions (or regions or
+ * queries) identified by the case. This is an implementation of the
+ * SQL {@code IN} clause.</dd>
+ * <dl>
+ * <p>
+ * In both cases, the conditions are in the form of a
+ * {@link ColRelOpConst} in which one side refers to a column in the scan
+ * and the other is a constant expression. The "driver" will ensure
+ * the rel op is of the correct form; this class ensures that the
+ * column is valid for the scan and the type of the value matches the
+ * column type (or can be converted.)
+ * <p>
+ * The DNF form further ensures that all rel ops refer to the same
+ * column, and that only the equality operator appears in the
+ * terms.
+ */
+public interface FilterPushDownListener {
+
+  /**
+   * @return a prefix to display in filter rules
+   */
+  String prefix();
+
+  /**
+   * Broad check to see if the scan is of the correct type for this
+   * listener. Generally implemented as: <code><pre>
+   * public boolean isTargetScan(ScanPrel scan) {
+   *   return scan.getGroupScan() instanceof MyGroupScan;
+   * }
+   * </pre></code>
+   * @return true if the given group scan is one this listener can
+   * handle, false otherwise
+   */
+  boolean isTargetScan(GroupScan groupScan);
+
+  /**
+   * Check if the filter rule should be applied to the target group scan,
+   * and if so, return the builder to use.
+   * <p>
+   * Calcite will run this rule multiple times during planning, but the
+   * transform only needs to occur once.
+   * Allows the group scan to mark in its own way whether the rule has
+   * been applied.
+   *
+   * @param groupScan the scan node
+   * @return builder instance if the push-down should be applied,
+   * null otherwise
+   */
+  ScanPushDownListener builderFor(GroupScan groupScan);
+
+  /**
+   * Listener for a one specific group scan.
+   */
+  public interface ScanPushDownListener {
+
+    /**
+     * Determine if the given relational operator (which is already in the form
+     * {@code <col name> <relop> <const>}, qualifies for push down for
+     * this scan.
+     * <p>
+     * If so, return an equivalent RelOp with the value normalized to what
+     * the plugin needs. The returned value may be the same as the original
+     * one if the value is already normalized.
+     *
+     * @param groupScan the scan element. Use {@code scan.getGroupScan()}
+     * to get the group scan
+     * @param relOp the description of the relational operator expression
+     * @return a normalized RelOp if this relop can be transformed into a filter
+     * push-down, @{code null} if not and thus the relop should remain in
+     * the Drill plan
+     * @see {@link ConstantHolder#normalize(org.apache.drill.common.types.TypeProtos.MinorType)}
+     */
+    ExprNode accept(ExprNode conjunct);
+
+    /**
+     * Transform a normalized DNF term into a new scan. Normalized form is:
+     * <br><code><pre>
+     * (a AND b AND (x OR y))</pre></code><br>
+     * In which each {@code OR} term represents a scan partition. It
+     * is up to the code here to determine if the scan partition can be handled,
+     * corresponds to a storage partition, or can be done as a separate
+     * scan (as for a JDBC or REST plugin, say.)
+     * <p>
+     * Each term is accompanied by the Calcite expression from which it was
+     * derived. The caller is responsible for determining which expressions,
+     * if any, to leave in the query by returning a list AND'ed (CNF) terms
+     * to leave in the query. Those terms can be the ones passed in, or
+     * new terms to handle special needs.
+     *
+     * @param groupScan the scan node
+     * @param andTerms a list of the CNF (AND) terms, in which each is given
+     * by the Calcite AND node and the derived RelOp expression.
+     * @param orTerm the DNF (OR) term, if any, that includes the Calcite
+     * node for that term and the set of OR terms. Only provided if the OR
+     * term represents a simple list of values (all OR clauses are on the
+     * same column). The OR term itself is AND'ed with the CNF terms.
+     * @return a pair of elements: a new scan (that represents the pushed filters),
+     * and the original or new expression to appear in the WHERE clause
+     * joined by AND with any non-candidate expressions. That is, if analysis
+     * determines that the plugin can't handle (or cannot completely handle)
+     * a term, return the Calcite node for that term back as part of the
+     * return value and it will be left in the query. Any Calcite nodes
+     * not returned are removed from the query and it is the scan's responsibility
+     * to handle them. Either the group scan or the list of Calcite nodes
+     * must be non-null. Or, return null if the filter condition can't be handled
+     * and the query should remain unchanged.
+     */
+    Pair<GroupScan, List<RexNode>> transform(AndNode expr);
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/filter/FilterPushDownStrategy.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/filter/FilterPushDownStrategy.java
new file mode 100644
index 0000000..e652fc7
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/filter/FilterPushDownStrategy.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http.filter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+import org.apache.drill.exec.planner.logical.DrillFilterRel;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.DrillProjectRel;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.http.filter.ExprNode.AndNode;
+import org.apache.drill.exec.store.http.filter.FilterPushDownListener.ScanPushDownListener;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+/**
+ * Generalized filter push-down strategy which performs all the tree-walking
+ * and tree restructuring work, allowing a "listener" to do the work needed
+ * for a particular scan.
+ * <p>
+ * General usage in a storage plugin: <code><pre>
+ * public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(
+ *        OptimizerRulesContext optimizerRulesContext) {
+ *   return FilterPushDownStrategy.rulesFor(optimizerRulesContext,
+ *      new MyPushDownListener(...));
+ * }
+ * </pre></code>
+ */
+public class FilterPushDownStrategy {
+
+  private static final Collection<String> BANNED_OPERATORS =
+      Collections.singletonList("flatten");
+
+  /**
+   * Base rule that passes target information to the push-down strategy
+   */
+  private static abstract class AbstractFilterPushDownRule extends StoragePluginOptimizerRule {
+
+    protected final FilterPushDownStrategy strategy;
+
+    public AbstractFilterPushDownRule(RelOptRuleOperand operand, String description,
+        FilterPushDownStrategy strategy) {
+      super(operand, description);
+      this.strategy = strategy;
+    }
+  }
+
+  /**
+   * Custom rule passed to Calcite for FILTER --> PROJECT --> SCAN
+   */
+  private static class ProjectAndFilterRule extends AbstractFilterPushDownRule {
+
+    private ProjectAndFilterRule(FilterPushDownStrategy strategy) {
+      super(RelOptHelper.some(FilterPrel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))),
+            strategy.namePrefix() + "PushDownFilter:Filter_On_Project",
+            strategy);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+      if (!super.matches(call)) {
+        return false;
+      }
+      DrillScanRel scan = call.rel(2);
+      return strategy.isTargetScan(scan);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      DrillFilterRel filterRel = call.rel(0);
+      DrillProjectRel projectRel = call.rel(1);
+      DrillScanRel scanRel = call.rel(2);
+      strategy.onMatch(call, filterRel, projectRel, scanRel);
+    }
+  }
+
+  /**
+   * Custom rule passed to Calcite to handle FILTER --> SCAN
+   */
+  private static class FilterWithoutProjectRule extends AbstractFilterPushDownRule {
+
+    private FilterWithoutProjectRule(FilterPushDownStrategy strategy) {
+      super(RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)),
+            strategy.namePrefix() + "PushDownFilter:Filter_On_Scan",
+            strategy);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+      if (!super.matches(call)) {
+        return false;
+      }
+      DrillScanRel scan = call.rel(1);
+      return strategy.isTargetScan(scan);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      DrillFilterRel filterRel = call.rel(0);
+      DrillScanRel scanRel = call.rel(1);
+      strategy.onMatch(call, filterRel, null, scanRel);
+    }
+  }
+
+  /**
+   * Implement filter push-down for one scan.
+   */
+  private static class FilterPushDownBuilder {
+
+    private final RelOptRuleCall call;
+    private final DrillFilterRel filter;
+    private final DrillProjectRel project;
+    private final DrillScanRel scan;
+    private final ScanPushDownListener scanListener;
+    // Predicates which cannot be converted to a filter predicate
+    List<RexNode> nonConvertedPreds = new ArrayList<>();
+
+    protected FilterPushDownBuilder(RelOptRuleCall call, DrillFilterRel filter, DrillProjectRel project, DrillScanRel scan, ScanPushDownListener scanListener) {
+      this.call = call;
+      this.filter = filter;
+      this.project = project;
+      this.scan = scan;
+      this.scanListener = scanListener;
+    }
+
+    void apply() {
+      AndNode cnfNode = sortPredicates();
+      if (cnfNode == null) {
+        return;
+      }
+
+      Pair<GroupScan, List<RexNode>> translated =
+          scanListener.transform(cnfNode);
+
+      // Listener abandoned effort. (Allows a stub early in development.)
+      if (translated == null) {
+        return;
+      }
+
+      // Listener rejected the DNF terms
+      GroupScan newGroupScan = translated.left;
+      if (newGroupScan == null) {
+        return;
+      }
+
+      // Gather unqualified and rewritten predicates
+      List<RexNode> remainingPreds = new ArrayList<>();
+      remainingPreds.addAll(nonConvertedPreds);
+      if (translated.right != null) {
+        remainingPreds.addAll(translated.right);
+      }
+
+      // Replace the child with the new filter on top of the child/scan
+      call.transformTo(rebuildTree(newGroupScan, remainingPreds));
+    }
+
+    private AndNode sortPredicates() {
+
+      // Get the filter expression
+      RexNode condition;
+      if (project == null) {
+        condition = filter.getCondition();
+      } else {
+        // get the filter as if it were below the projection.
+        condition = RelOptUtil.pushPastProject(filter.getCondition(), project);
+      }
+
+      // Skip if no expression or expression is trivial.
+      // This seems to never happen because Calcite optimizes away
+      // any expression of the form WHERE true, 1 = 1 or 0 = 1.
+      if (condition == null || condition.isAlwaysTrue() || condition.isAlwaysFalse()) {
+        return null;
+      }
+
+      // Get a conjunction of the filter conditions. For each conjunction, if it refers
+      // to ITEM or FLATTEN expression then it cannot be pushed down. Otherwise, it's
+      // qualified to be pushed down.
+      List<RexNode> filterPreds = RelOptUtil.conjunctions(
+          RexUtil.toCnf(filter.getCluster().getRexBuilder(), condition));
+
+      DrillParseContext parseContext = new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner()));
+      List<ExprNode> conjuncts = new ArrayList<>();
+      for (RexNode pred : filterPreds) {
+        ExprNode conjunct = identifyCandidate(parseContext, scan, pred);
+        if (conjunct == null) {
+          nonConvertedPreds.add(pred);
+        } else {
+          conjunct.tag(pred);
+          conjuncts.add(conjunct);
+        }
+      }
+      return conjuncts.isEmpty() ? null : new AndNode(conjuncts);
+    }
+
+    public ExprNode identifyCandidate(DrillParseContext parseContext, DrillScanRel scan, RexNode pred) {
+      if (DrillRelOptUtil.findOperators(pred, Collections.emptyList(), BANNED_OPERATORS) != null) {
+        return null;
+      }
+
+      // Extract an AND term, which may be an OR expression.
+      LogicalExpression drillPredicate = DrillOptiq.toDrill(parseContext, scan, pred);
+      ExprNode expr = drillPredicate.accept(FilterPushDownUtils.REL_OP_EXTRACTOR, null);
+      if (expr == null) {
+        return null;
+      }
+
+      // Check if each term can be pushed down, and, if so, return a new RelOp
+      // with the value normalized.
+      return scanListener.accept(expr);
+    }
+
+    /**
+     * Rebuilds the query plan subtree to include any substitutions and removals requested
+     * by the listener.
+     *
+     * @param oldScan the original scan node
+     * @param newGroupScan the optional replacement scan node given by the listener
+     * @param filter the original filter
+     * @param project the original optional project node
+     * @param remainingPreds the Calcite predicates which the listener *does not* handle
+     * and which should remain in the plan tree
+     * @return a rebuilt query subtree
+     */
+    private RelNode rebuildTree(GroupScan newGroupScan, List<RexNode> remainingPreds) {
+
+      // Rebuild the subtree with transformed nodes.
+
+      // Scan: new if available, else existing.
+      RelNode newNode;
+      if (newGroupScan == null) {
+        newNode = scan;
+      } else {
+        newNode = new DrillScanRel(scan.getCluster(), scan.getTraitSet(), scan.getTable(),
+            newGroupScan, scan.getRowType(), scan.getColumns());
+      }
+
+      // Copy project, if exists
+      if (project != null) {
+        newNode = project.copy(project.getTraitSet(), Collections.singletonList(newNode));
+      }
+
+      // Add filter, if any predicates remain.
+      if (!remainingPreds.isEmpty()) {
+
+        // If some of the predicates weren't used in the filter, creates new filter with them
+        // on top of current scan. Excludes the case when all predicates weren't used in the filter.
+        // FILTER(a, b, c) --> SCAN becomes FILTER(a, d) --> SCAN
+        newNode = filter.copy(filter.getTraitSet(), newNode,
+            RexUtil.composeConjunction(
+                filter.getCluster().getRexBuilder(),
+                remainingPreds,
+                true));
+      }
+
+      return newNode;
+    }
+  }
+
+  private final FilterPushDownListener listener;
+
+  public FilterPushDownStrategy(FilterPushDownListener listener) {
+    this.listener = listener;
+  }
+
+  public Set<StoragePluginOptimizerRule> rules() {
+    return ImmutableSet.of(
+        new ProjectAndFilterRule(this),
+        new FilterWithoutProjectRule(this));
+  }
+
+  public static Set<StoragePluginOptimizerRule> rulesFor(
+      FilterPushDownListener listener) {
+    return new FilterPushDownStrategy(listener).rules();
+  }
+
+  private String namePrefix() { return listener.prefix(); }
+
+  private boolean isTargetScan(DrillScanRel scan) {
+    return listener.isTargetScan(scan.getGroupScan());
+  }
+
+  public void onMatch(RelOptRuleCall call, DrillFilterRel filter, DrillProjectRel project, DrillScanRel scan) {
+
+    // Skip if rule has already been applied.
+    ScanPushDownListener scanListener = listener.builderFor(scan.getGroupScan());
+    if (scanListener != null) {
+      new FilterPushDownBuilder(call, filter, project, scan, scanListener).apply();
+    }
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/filter/FilterPushDownUtils.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/filter/FilterPushDownUtils.java
new file mode 100644
index 0000000..9005133
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/filter/FilterPushDownUtils.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http.filter;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.calcite.util.Pair;
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
+import org.apache.drill.common.expression.ValueExpressions.DateExpression;
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
+import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntervalDayExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntervalYearExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
+import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
+import org.apache.drill.common.expression.ValueExpressions.VarDecimalExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.store.http.filter.ExprNode.AndNode;
+import org.apache.drill.exec.store.http.filter.ExprNode.ColRelOpConstNode;
+import org.apache.drill.exec.store.http.filter.ExprNode.OrNode;
+
+public class FilterPushDownUtils {
+
+  /**
+   * Extracted selected constants from an argument. Finds literals, omits
+   * expressions, columns and so on.
+   * <p>
+   * The core types (INT, BIGINT, BIT (Boolean), VARCHAR and VARDECIMAL) are
+   * known to work. The others may or may not work depending on Drill's
+   * parser/planner; testing is needed.
+   */
+  private static class ConstantExtractor extends AbstractExprVisitor<ConstantHolder, Void, RuntimeException> {
+
+    @Override
+    public ConstantHolder visitIntConstant(IntExpression expr, Void value) throws RuntimeException {
+      return new ConstantHolder(MinorType.INT, expr.getInt());
+    }
+
+    @Override
+    public ConstantHolder visitLongConstant(LongExpression expr, Void value) throws RuntimeException {
+      return new ConstantHolder(MinorType.BIGINT, expr.getLong());
+    }
+
+    @Override
+    public ConstantHolder visitBooleanConstant(BooleanExpression expr, Void value) throws RuntimeException {
+      return new ConstantHolder(MinorType.BIT, expr.getBoolean());
+    }
+
+    @Override
+    public ConstantHolder visitQuotedStringConstant(QuotedString expr, Void value) throws RuntimeException {
+      return new ConstantHolder(MinorType.VARCHAR, expr.getString());
+    }
+
+    // Float mapped to Double for storage to simplify clients
+    // Not clear that Drill generates floats rather than doubles.
+    @Override
+    public ConstantHolder visitFloatConstant(FloatExpression expr, Void value) throws RuntimeException {
+      return new ConstantHolder(MinorType.FLOAT8, (double) expr.getFloat());
+    }
+
+    // Seems to not be used. Anything float-like is instead represented as a
+    // VarDecimal constant.
+    @Override
+    public ConstantHolder visitDoubleConstant(DoubleExpression expr, Void value) throws RuntimeException {
+      return new ConstantHolder(MinorType.FLOAT8, expr.getDouble());
+    }
+
+    // Legacy decimals no longer supported, so not implemented.
+    @Override
+    public ConstantHolder visitVarDecimalConstant(VarDecimalExpression expr, Void value) throws RuntimeException {
+      return new ConstantHolder(MinorType.VARDECIMAL, expr.getBigDecimal());
+    }
+
+    // Example: DATE '2008-2-23'
+    @Override
+    public ConstantHolder visitDateConstant(DateExpression expr, Void value) throws RuntimeException {
+      return new ConstantHolder(MinorType.DATE, expr.getDate());
+    }
+
+    // Example: TIME '12:23:34'
+    @Override
+    public ConstantHolder visitTimeConstant(TimeExpression expr, Void value) throws RuntimeException {
+      return new ConstantHolder(MinorType.TIME, expr.getTime());
+    }
+
+    // Example: TIMESTAMP '2008-2-23 12:23:34.456'
+    @Override
+    public ConstantHolder visitTimeStampConstant(TimeStampExpression expr, Void value) throws RuntimeException {
+      return new ConstantHolder(MinorType.TIMESTAMP, expr.getTimeStamp());
+    }
+
+    // Example: INTERVAL '1' YEAR
+    @Override
+    public ConstantHolder visitIntervalYearConstant(IntervalYearExpression expr, Void value) throws RuntimeException {
+      return new ConstantHolder(MinorType.INTERVALYEAR, expr.getIntervalYear());
+    }
+
+    // Example: INTERVAL '1 10:20:30' DAY TO SECOND
+    // This field has two parts, encoded as a Pair.
+    @Override
+    public ConstantHolder visitIntervalDayConstant(IntervalDayExpression expr, Void value) throws RuntimeException {
+      return new ConstantHolder(MinorType.INTERVALDAY,
+          Pair.of(expr.getIntervalDay(), expr.getIntervalMillis()));
+    }
+
+   @Override
+    public ConstantHolder visitUnknown(LogicalExpression e, Void valueArg) throws RuntimeException {
+      return null;
+    }
+  }
+
+  /**
+   * Extract a column name argument, or null if the argument is not a column, or is
+   * a complex column (a[10], a.b).
+   */
+  private static class ColRefExtractor extends AbstractExprVisitor<String, Void, RuntimeException> {
+
+    @Override
+    public String visitSchemaPath(SchemaPath path, Void value) throws RuntimeException {
+
+      // Can't handle names such as a.b or a[10]
+      if (! path.isLeaf()) {
+        return null;
+      }
+
+      // Can only handle columns known to the scan
+      return path.getRootSegmentPath();
+    }
+
+    @Override
+    public String visitUnknown(LogicalExpression e, Void valueArg) throws RuntimeException {
+      return null;
+    }
+  }
+
+  /**
+   * Extract a relational operator of the pattern<br>
+   * <tt>&lt;col> &lt;relop> &lt;const></tt> or<br>
+   * <tt>&lt;col> &lt;relop></tt>.
+   */
+  private static class RelOpExtractor extends AbstractExprVisitor<ExprNode, Void, RuntimeException> {
+
+    @Override
+    public ExprNode visitBooleanOperator(BooleanOperator op, Void value) throws RuntimeException {
+      switch (op.getName()) {
+        case FunctionNames.OR:
+          break;
+        case FunctionNames.AND:
+          break;
+        default:
+          return null;
+      }
+
+      List<ExprNode> args = op.args()
+          .stream()
+          .map(a -> a.accept(this, null))
+          .collect(Collectors.toList());
+      switch (op.getName()) {
+        case FunctionNames.OR:
+          return new OrNode(args);
+        case FunctionNames.AND:
+          return new AndNode(args);
+        default:
+          return null;
+      }
+    }
+
+    @Override
+    public ExprNode visitFunctionCall(FunctionCall call, Void value) throws RuntimeException {
+
+      RelOp op;
+      switch(call.getName()) {
+      case FunctionNames.EQ:
+        op = RelOp.EQ;
+        break;
+      case FunctionNames.NE:
+        op = RelOp.NE;
+        break;
+      case FunctionNames.LT:
+        op = RelOp.LT;
+        break;
+      case FunctionNames.LE:
+        op = RelOp.LE;
+        break;
+      case FunctionNames.GT:
+        op = RelOp.GT;
+        break;
+      case FunctionNames.GE:
+        op = RelOp.GE;
+        break;
+      case FunctionNames.IS_NULL:
+        op = RelOp.IS_NULL;
+        break;
+      case FunctionNames.IS_NOT_NULL:
+        op = RelOp.IS_NOT_NULL;
+        break;
+      default:
+        return null;
+      }
+
+      if (op.argCount() == 1) {
+        return checkCol(op, call);
+      } else {
+        ExprNode relOpNode = checkColOpConst(op, call);
+        if (relOpNode == null) {
+          relOpNode = checkConstOpCol(op, call);
+        }
+        return relOpNode;
+      }
+    }
+
+    /**
+     * Check just the one argument for a unary operator:
+     * IS NULL, IS NOT NULL.
+     */
+    private ExprNode checkCol(RelOp op, FunctionCall call) {
+      String colName = call.arg(0).accept(COL_REF_EXTRACTOR, null);
+      if (colName == null) {
+        return null;
+      }
+
+      return new ColRelOpConstNode(colName, op, null);
+    }
+
+    /**
+     * Extracts a relational operator of the "normal" form of:<br>
+     * <tt>&lt;col> &lt;relop> &lt;const>.
+     */
+    private ExprNode checkColOpConst(RelOp op, FunctionCall call) {
+      String colName = call.arg(0).accept(COL_REF_EXTRACTOR, null);
+      if (colName == null) {
+        return null;
+      }
+
+      ConstantHolder constArg = call.arg(1).accept(CONSTANT_EXTRACTOR, null);
+      if (constArg == null) {
+        return null;
+      }
+
+      return new ColRelOpConstNode(colName, op, constArg);
+    }
+
+    /**
+     * Extracts a relational operator of the "reversed" form of:<br>
+     * <tt>&lt;const> &lt;relop> &lt;col>. (Unfortunately, Calcite
+     * does not normalize predicates.) Reverses the sense of the
+     * relational operator to put the predicate into normalized
+     * form.
+     */
+    private ExprNode checkConstOpCol(RelOp op, FunctionCall call) {
+      ConstantHolder constArg = call.arg(0).accept(CONSTANT_EXTRACTOR, null);
+      if (constArg == null) {
+        return null;
+      }
+
+      String colName = call.arg(1).accept(COL_REF_EXTRACTOR, null);
+      if (colName == null) {
+        return null;
+      }
+
+      return new ColRelOpConstNode(colName, op.invert(), constArg);
+    }
+
+    @Override
+    public ExprNode visitUnknown(LogicalExpression e, Void value) throws RuntimeException {
+      // Catches OR clauses among other things
+      return null;
+    }
+  }
+
+  private static final ConstantExtractor CONSTANT_EXTRACTOR = new ConstantExtractor();
+
+  private static final ColRefExtractor COL_REF_EXTRACTOR = new ColRefExtractor();
+
+  public static final RelOpExtractor REL_OP_EXTRACTOR = new RelOpExtractor();
+
+  /**
+   * Filter push-down is best done during logical planning so that the result can
+   * influence parallelization in the physical phase. The specific phase differs
+   * depending on which planning mode is enabled. This check hides those details
+   * from storage plugins that simply want to know "should I add my filter
+   * push-down rules in the given phase?"
+   *
+   * @return true if filter push-down rules should be applied in this phase
+   */
+  public static boolean isFilterPushDownPhase(PlannerPhase phase) {
+    switch (phase) {
+    case LOGICAL_PRUNE_AND_JOIN: // HEP is disabled
+    case PARTITION_PRUNING:      // HEP partition push-down enabled
+    case LOGICAL_PRUNE:          // HEP partition push-down disabled
+      return true;
+    default:
+      return false;
+    }
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/filter/RelOp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/filter/RelOp.java
new file mode 100644
index 0000000..9b59585
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/filter/RelOp.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.http.filter;
+
+/**
+ * Fixed set of Drill relational operators, using well-defined
+ * names. Distilled from the more general string function names
+ * used in the query plan tree.
+ */
+public enum RelOp {
+  // Order of LT, LE, GE, GT is important for
+  // value comparisons
+  EQ, NE, LT, LE, GE, GT, IS_NULL, IS_NOT_NULL;
+
+  /**
+   * Return the result of flipping the sides of an
+   * expression:</br>
+   * {@code a op b} &rarr; {@code b op.invert() a}
+   *
+   * @return a new relop resulting from flipping the sides
+   * of the expression, or this relop if the operation
+   * is symmetric.
+   */
+  public RelOp invert() {
+    switch(this) {
+      case LT:
+        return GT;
+      case LE:
+        return GE;
+      case GT:
+        return LT;
+      case GE:
+        return LE;
+      default:
+        return this;
+    }
+  }
+
+  /**
+   * Returns the number of arguments for the relop.
+   * @return 1 for IS (NOT) NULL, 2 otherwise
+   */
+  public int argCount() {
+    switch (this) {
+      case IS_NULL:
+      case IS_NOT_NULL:
+        return 1;
+      default:
+        return 2;
+    }
+  }
+
+  /**
+   * Poor-man's guess at selectivity of each operator.
+   * Should match Calcite's built-in defaults. The Calcite estimates
+   * are not great, but we need to match them.
+   * <p>
+   * If a query has access to metadata, then each predicates should
+   * have a computed selectivity based on that metadata. This
+   * mechanism should be extended to include that selectivity as a field,
+   * and pass it back from this method.
+   *
+   * @return crude estimate of operator selectivity
+   * @see {@code package org.apache.calcite.rel.metadata.RelMdUtil}
+   */
+  public double selectivity() {
+    switch (this) {
+      case EQ:
+        return 0.15;
+      case GE:
+      case GT:
+      case LE:
+      case LT:
+      case NE: // Very bad estimate!
+        return 0.5;
+      case IS_NOT_NULL:
+      case IS_NULL:
+        return 0.9;
+      default:
+        return 0.25;
+    }
+  }
+}
\ No newline at end of file
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 9bc234f..8918e74 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -21,19 +21,20 @@ import okhttp3.Authenticator;
 import okhttp3.Cache;
 import okhttp3.Credentials;
 import okhttp3.FormBody;
+import okhttp3.HttpUrl;
 import okhttp3.Interceptor;
 import okhttp3.OkHttpClient;
 import okhttp3.OkHttpClient.Builder;
 import okhttp3.Request;
 import okhttp3.Response;
-
 import okhttp3.Route;
 
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.store.http.HttpAPIConfig;
-import org.apache.drill.exec.store.http.HttpAPIConfig.HttpMethods;
+import org.apache.drill.exec.store.http.HttpApiConfig;
+import org.apache.drill.exec.store.http.HttpApiConfig.HttpMethod;
 import org.apache.drill.exec.store.http.HttpStoragePluginConfig;
+import org.apache.drill.exec.store.http.HttpSubScan;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,75 +59,22 @@ public class SimpleHttp {
   private static final Logger logger = LoggerFactory.getLogger(SimpleHttp.class);
 
   private final OkHttpClient client;
-  private final HttpStoragePluginConfig config;
-  private final HttpAPIConfig apiConfig;
+  private final HttpSubScan scanDefn;
   private final File tempDir;
   private final HttpProxyConfig proxyConfig;
   private final CustomErrorContext errorContext;
+  private final HttpUrl url;
 
-  public SimpleHttp(HttpStoragePluginConfig config, File tempDir,
-      String connectionName, HttpProxyConfig proxyConfig,
-      CustomErrorContext errorContext) {
-    this.config = config;
+  public SimpleHttp(HttpSubScan scanDefn, HttpUrl url, File tempDir,
+      HttpProxyConfig proxyConfig, CustomErrorContext errorContext) {
+    this.scanDefn = scanDefn;
+    this.url = url;
     this.tempDir = tempDir;
-    this.apiConfig = config.connections().get(connectionName);
     this.proxyConfig = proxyConfig;
     this.errorContext = errorContext;
     this.client = setupHttpClient();
   }
 
-  public InputStream getInputStream(String urlStr) {
-    Request.Builder requestBuilder;
-
-    requestBuilder = new Request.Builder()
-        .url(urlStr);
-
-    // The configuration does not allow for any other request types other than POST and GET.
-    if (apiConfig.getMethodType() == HttpMethods.POST) {
-      // Handle POST requests
-      FormBody.Builder formBodyBuilder = buildPostBody();
-      requestBuilder.post(formBodyBuilder.build());
-    }
-
-    // Add headers to request
-    if (apiConfig.headers() != null) {
-      for (Map.Entry<String, String> entry : apiConfig.headers().entrySet()) {
-        requestBuilder.addHeader(entry.getKey(), entry.getValue());
-      }
-    }
-
-    // Build the request object
-    Request request = requestBuilder.build();
-
-    try {
-      // Execute the request
-      Response response = client
-        .newCall(request)
-        .execute();
-
-      // If the request is unsuccessful, throw a UserException
-      if (!response.isSuccessful()) {
-        throw UserException
-          .dataReadError()
-          .message("Error retrieving data from HTTP Storage Plugin: %d %s",
-              response.code(), response.message())
-          .addContext(errorContext)
-          .build(logger);
-      }
-      logger.debug("HTTP Request for {} successful.", urlStr);
-      logger.debug("Response Headers: {} ", response.headers().toString());
-
-      // Return the InputStream of the response
-      return Objects.requireNonNull(response.body()).byteStream();
-    } catch (IOException e) {
-      throw UserException
-        .dataReadError(e)
-        .message("Error retrieving data from HTTP Storage Plugin: %s", e.getMessage())
-        .addContext(errorContext)
-        .build(logger);
-    }
-  }
-
   /**
    * Configures the OkHTTP3 server object with configuration info from the user.
    *
@@ -138,20 +86,23 @@ public class SimpleHttp {
     // Set up the HTTP Cache.   Future possibilities include making the cache size and retention configurable but
     // right now it is on or off.  The writer will write to the Drill temp directory if it is accessible and
     // output a warning if not.
+    HttpStoragePluginConfig config = scanDefn.tableSpec().config();
     if (config.cacheResults()) {
       setupCache(builder);
     }
 
     // If the API uses basic authentication add the authentication code.
+    HttpApiConfig apiConfig = scanDefn.tableSpec().connectionConfig();
     if (apiConfig.authType().toLowerCase().equals("basic")) {
       logger.debug("Adding Interceptor");
       builder.addInterceptor(new BasicAuthInterceptor(apiConfig.userName(), apiConfig.password()));
     }
 
     // Set timeouts
-    builder.connectTimeout(config.timeout(), TimeUnit.SECONDS);
-    builder.writeTimeout(config.timeout(), TimeUnit.SECONDS);
-    builder.readTimeout(config.timeout(), TimeUnit.SECONDS);
+    int timeout = Math.max(1, config.timeout());
+    builder.connectTimeout(timeout, TimeUnit.SECONDS);
+    builder.writeTimeout(timeout, TimeUnit.SECONDS);
+    builder.readTimeout(timeout, TimeUnit.SECONDS);
 
     // Set the proxy configuration
 
@@ -184,6 +135,67 @@ public class SimpleHttp {
     return builder.build();
   }
 
+  public String url() { return url.toString(); }
+
+  public InputStream getInputStream() {
+
+    Request.Builder requestBuilder = new Request.Builder()
+        .url(url);
+
+    // The configuration does not allow for any other request types other than POST and GET.
+    HttpApiConfig apiConfig = scanDefn.tableSpec().connectionConfig();
+    if (apiConfig.getMethodType() == HttpMethod.POST) {
+      // Handle POST requests
+      FormBody.Builder formBodyBuilder = buildPostBody(apiConfig.postBody());
+      requestBuilder.post(formBodyBuilder.build());
+    }
+
+    // Log the URL and method to aid in debugging user issues.
+    logger.info("Connection: {}, Method {}, URL: {}",
+        scanDefn.tableSpec().connection(),
+        apiConfig.getMethodType().name(), url());
+
+    // Add headers to request
+    if (apiConfig.headers() != null) {
+      for (Map.Entry<String, String> entry : apiConfig.headers().entrySet()) {
+        requestBuilder.addHeader(entry.getKey(), entry.getValue());
+      }
+    }
+
+    // Build the request object
+    Request request = requestBuilder.build();
+
+    try {
+      // Execute the request
+      Response response = client
+        .newCall(request)
+        .execute();
+
+      // If the request is unsuccessful, throw a UserException
+      if (!response.isSuccessful()) {
+        throw UserException
+          .dataReadError()
+          .message("HTTP request failed")
+          .addContext("Response code", response.code())
+          .addContext("Response message", response.message())
+          .addContext(errorContext)
+          .build(logger);
+      }
+      logger.debug("HTTP Request for {} successful.", url());
+      logger.debug("Response Headers: {} ", response.headers().toString());
+
+      // Return the InputStream of the response
+      return Objects.requireNonNull(response.body()).byteStream();
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to read the HTTP response body")
+        .addContext("Error message", e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+  }
+
   /**
    * Configures response caching using a provided temp directory.
    *
@@ -225,11 +237,11 @@ public class SimpleHttp {
    *
    * @return FormBody.Builder The populated formbody builder
    */
-  private FormBody.Builder buildPostBody() {
+  private FormBody.Builder buildPostBody(String postBody) {
     final Pattern postBodyPattern = Pattern.compile("^.+=.+$");
 
     FormBody.Builder formBodyBuilder = new FormBody.Builder();
-    String[] lines = apiConfig.postBody().split("\\r?\\n");
+    String[] lines = postBody.split("\\r?\\n");
     for(String line : lines) {
 
       // If the string is in the format key=value split it,
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index f99df7c..6434cff 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -24,9 +24,9 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -41,7 +41,7 @@ import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 import org.apache.drill.shaded.guava.com.google.common.io.Files;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
-import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -49,8 +49,6 @@ import org.junit.Test;
 import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
 import okhttp3.mockwebserver.RecordedRequest;
-import okio.Buffer;
-import okio.Okio;
 
 /**
  * Tests the HTTP Storage plugin. Since the plugin makes use of REST requests,
@@ -74,34 +72,75 @@ public class TestHttpPlugin extends ClusterTest {
     TEST_JSON_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response.json"), Charsets.UTF_8).read();
 
     dirTestWatcher.copyResourceToRoot(Paths.get("data/"));
+    makeLiveConfig();
+    makeMockConfig();
+  }
 
-    Map<String, String> headers = new HashMap<>();
-    headers.put("header1", "value1");
-    headers.put("header2", "value2");
-
-    HttpAPIConfig mockConfig = new HttpAPIConfig("http://localhost:8091/", "GET", headers, "basic", "user", "pass",null);
-
-    HttpAPIConfig sunriseConfig = new HttpAPIConfig("https://api.sunrise-sunset.org/", "GET", null, null, null, null, null);
+  /**
+   * Create configs against live external servers. Must be tested manually, and
+   * subject to the whims of the external site. Timeout is 10 seconds to allow
+   * for real-world delays.
+   */
+  private static void makeLiveConfig() {
 
-    HttpAPIConfig stockConfig = new HttpAPIConfig("https://api.worldtradingdata.com/api/v1/stock?symbol=SNAP,TWTR,VOD" +
-      ".L&api_token=zuHlu2vZaehdZN6GmJdTiVlp7xgZn6gl6sfgmI4G6TY4ej0NLOzvy0TUl4D4", "get", null, null, null, null, null);
+    HttpApiConfig sunriseConfig = new HttpApiConfig("https://api.sunrise-sunset.org/json", "GET", null, null, null, null, null, null, null, null);
+    HttpApiConfig sunriseWithParamsConfig = new HttpApiConfig("https://api.sunrise-sunset.org/json", "GET", null, null, null, null, null,
+        Arrays.asList("lat", "lng", "date"), "results", false);
 
-    HttpAPIConfig mockPostConfig = new HttpAPIConfig("http://localhost:8091/", "POST", headers, null, null, null,"key1=value1\nkey2=value2");
+    HttpApiConfig stockConfig = new HttpApiConfig("https://api.worldtradingdata.com/api/v1/stock?symbol=SNAP,TWTR,VOD" +
+      ".L&api_token=zuHlu2vZaehdZN6GmJdTiVlp7xgZn6gl6sfgmI4G6TY4ej0NLOzvy0TUl4D4", "get", null, null, null, null, null, null, null, null);
 
-    Map<String, HttpAPIConfig> configs = new HashMap<>();
+    Map<String, HttpApiConfig> configs = new HashMap<>();
     configs.put("stock", stockConfig);
     configs.put("sunrise", sunriseConfig);
-    configs.put("mock", mockConfig);
+    configs.put("sunrise2", sunriseWithParamsConfig);
+
+    HttpStoragePluginConfig mockStorageConfigWithWorkspace = new HttpStoragePluginConfig(false, configs, 10, "", 80, "", "", "");
+    mockStorageConfigWithWorkspace.setEnabled(true);
+    cluster.defineStoragePlugin("live", mockStorageConfigWithWorkspace);
+  }
+
+  /**
+   * Create configs for an in-process mock server. Used for normal automated unit
+   * testing. Timeout is short to allow for timeout testing. The mock server is
+   * useful, but won't catch bugs related to real-world server glitches.
+   */
+  private static void makeMockConfig() {
+
+    Map<String, String> headers = new HashMap<>();
+    headers.put("header1", "value1");
+    headers.put("header2", "value2");
+
+    // Use the mock server with HTTP parameters passed as  table name.
+    // The connection acts like a schema.
+    // Ignores the message body except for data.
+    HttpApiConfig mockSchema = new HttpApiConfig("http://localhost:8091/json", "GET", headers,
+        "basic", "user", "pass", null, null, "results", null);
+
+    // Use the mock server with the HTTP parameters passed as WHERE
+    // clause filters. The connection acts like a table.
+    // Ignores the message body except for data.
+    // This is the preferred approach, the base URL contains as much info as possible;
+    // all other parameters are specified in SQL. See README for an example.
+    HttpApiConfig mockTable = new HttpApiConfig("http://localhost:8091/json", "GET", headers,
+        "basic", "user", "pass", null, Arrays.asList("lat", "lng", "date"), "results", false);
+
+    HttpApiConfig mockPostConfig = new HttpApiConfig("http://localhost:8091/", "POST", headers, null, null, null, "key1=value1\nkey2=value2", null, null, null);
+
+    Map<String, HttpApiConfig> configs = new HashMap<>();
+    configs.put("sunrise", mockSchema);
+    configs.put("mocktable", mockTable);
     configs.put("mockpost", mockPostConfig);
 
     HttpStoragePluginConfig mockStorageConfigWithWorkspace = new HttpStoragePluginConfig(false, configs, 2, "", 80, "", "", "");
     mockStorageConfigWithWorkspace.setEnabled(true);
-    cluster.defineStoragePlugin("api", mockStorageConfigWithWorkspace);
+    cluster.defineStoragePlugin("local", mockStorageConfigWithWorkspace);
   }
 
   @Test
   public void verifyPluginConfig() throws Exception {
-    String sql = "SELECT SCHEMA_NAME, TYPE FROM INFORMATION_SCHEMA.`SCHEMATA` WHERE TYPE='http'";
+    String sql = "SELECT SCHEMA_NAME, TYPE FROM INFORMATION_SCHEMA.`SCHEMATA` WHERE TYPE='http'\n" +
+        "ORDER BY SCHEMA_NAME";
 
     RowSet results = client.queryBuilder().sql(sql).rowSet();
 
@@ -110,15 +149,17 @@ public class TestHttpPlugin extends ClusterTest {
       .add("TYPE", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
       .buildSchema();
 
+    // Expect table-like connections to NOT appear here.
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-      .addRow("api.mock", "http")
-      .addRow("api.mockpost", "http")
-      .addRow("api.stock", "http")
-      .addRow("api.sunrise", "http")
-      .addRow("api", "http")
-      .build();
+        .addRow("live", "http") // For table-like connections
+        .addRow("live.stock", "http")
+        .addRow("live.sunrise", "http")
+        .addRow("local", "http")
+        .addRow("local.mockpost", "http")
+        .addRow("local.sunrise", "http")
+        .build();
 
-    new RowSetComparison(expected).verifyAndClearAll(results);
+    RowSetUtilities.verify(expected, results);
   }
 
   /**
@@ -152,7 +193,7 @@ public class TestHttpPlugin extends ClusterTest {
   @Test
   @Ignore("Requires Remote Server")
   public void simpleStarQuery() throws Exception {
-    String sql = "SELECT * FROM api.sunrise.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+    String sql = "SELECT * FROM live.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
 
     RowSet results = client.queryBuilder().sql(sql).rowSet();
 
@@ -173,19 +214,69 @@ public class TestHttpPlugin extends ClusterTest {
       .build();
 
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-      .addRow( mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
+      .addRow(mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57",
+                       "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM",
+                       "4:48:07 AM", "7:25:45 PM"), "OK")
       .build();
 
-    int resultCount =  results.rowCount();
-    new RowSetComparison(expected).verifyAndClearAll(results);
+    RowSetUtilities.verify(expected, results);
+  }
 
-    assertEquals(1,  resultCount);
+  /**
+   * As above, but we return only the contents of {@code results}, and use
+   * filter push-down for the arguments.
+   *
+   * @throws Exception
+   */
+  @Test
+  @Ignore("Requires Remote Server")
+  public void wildcardQueryWithParams() throws Exception {
+    String sql =
+        "SELECT * FROM live.sunrise2\n" +
+        "WHERE `lat`=36.7201600 AND `lng`=-4.4203400 AND `date`='2019-10-02'";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM",
+              "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM")
+      .build();
+
+    RowSetUtilities.verify(expected, results);
   }
 
   @Test
   @Ignore("Requires Remote Server")
   public void simpleSpecificQuery() throws Exception {
-    String sql = "SELECT t1.results.sunrise AS sunrise, t1.results.sunset AS sunset FROM api.sunrise.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1";
+    String sql = "SELECT t1.results.sunrise AS sunrise, t1.results.sunset AS sunset\n" +
+                 "FROM live.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1";
+    doSimpleSpecificQuery(sql);
+  }
+
+  @Test
+  @Ignore("Requires Remote Server")
+  public void simpleSpecificQueryWithParams() throws Exception {
+    String sql =
+        "SELECT sunrise, sunset\n" +
+        "FROM live.sunrise2\n" +
+        "WHERE `lat`=36.7201600 AND `lng`=-4.4203400 AND `date`='2019-10-02'";
+    doSimpleSpecificQuery(sql);
+  }
+
+  private void doSimpleSpecificQuery(String sql) throws Exception {
 
     RowSet results = client.queryBuilder().sql(sql).rowSet();
 
@@ -198,7 +289,7 @@ public class TestHttpPlugin extends ClusterTest {
       .addRow("6:13:58 AM", "5:59:55 PM")
       .build();
 
-    new RowSetComparison(expected).verifyAndClearAll(results);
+    RowSetUtilities.verify(expected, results);
   }
 
   @Test
@@ -210,15 +301,27 @@ public class TestHttpPlugin extends ClusterTest {
           .setBody(TEST_JSON_RESPONSE)
       );
 
-      String sql = "SELECT COUNT(*) FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+      String sql = "SELECT COUNT(*) FROM local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
       String plan = queryBuilder().sql(sql).explainJson();
       long cnt = queryBuilder().physical(plan).singletonLong();
-      assertEquals("Counts should match",1L, cnt);
+      assertEquals("Counts should match", 1L, cnt);
     }
   }
 
   @Test
   public void simpleTestWithMockServer() throws Exception {
+    String sql = "SELECT * FROM local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+    doSimpleTestWithMockServer(sql);
+  }
+
+  @Test
+  public void simpleTestWithMockServerWithParams() throws Exception {
+    String sql = "SELECT * FROM local.mocktable\n" +
+                 "WHERE `lat` = 36.7201600 AND `lng` = -4.4203400 AND `date` = '2019-10-02'";
+    doSimpleTestWithMockServer(sql);
+  }
+
+  private void doSimpleTestWithMockServer(String sql) throws Exception {
     try (MockWebServer server = startServer()) {
 
       server.enqueue(
@@ -226,11 +329,9 @@ public class TestHttpPlugin extends ClusterTest {
           .setBody(TEST_JSON_RESPONSE)
       );
 
-      String sql = "SELECT * FROM api.mock.`json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
       RowSet results = client.queryBuilder().sql(sql).rowSet();
 
       TupleMetadata expectedSchema = new SchemaBuilder()
-        .addMap("results")
         .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
         .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
         .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
@@ -241,18 +342,13 @@ public class TestHttpPlugin extends ClusterTest {
         .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
         .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
         .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
-        .resumeSchema()
-        .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
         .build();
 
       RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .addRow(mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
+        .addRow("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM")
         .build();
 
-      int resultCount =  results.rowCount();
-      new RowSetComparison(expected).verifyAndClearAll(results);
-
-      assertEquals(1,  resultCount);
+      RowSetUtilities.verify(expected, results);
     }
   }
 
@@ -266,7 +362,7 @@ public class TestHttpPlugin extends ClusterTest {
           .setBody(TEST_JSON_RESPONSE)
       );
 
-      String sql = "SELECT * FROM api.mockPost.`json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+      String sql = "SELECT * FROM local.mockPost.`json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
       RowSet results = client.queryBuilder().sql(sql).rowSet();
 
       TupleMetadata expectedSchema = new SchemaBuilder()
@@ -289,14 +385,12 @@ public class TestHttpPlugin extends ClusterTest {
         .addRow(mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
         .build();
 
-      int resultCount =  results.rowCount();
-      new RowSetComparison(expected).verifyAndClearAll(results);
+      RowSetUtilities.verify(expected, results);
 
       RecordedRequest recordedRequest = server.takeRequest();
       assertEquals("POST", recordedRequest.getMethod());
       assertEquals(recordedRequest.getHeader("header1"), "value1");
       assertEquals(recordedRequest.getHeader("header2"), "value2");
-      assertEquals(1,  resultCount);
     }
   }
 
@@ -309,7 +403,7 @@ public class TestHttpPlugin extends ClusterTest {
           .setBody(TEST_JSON_RESPONSE)
       );
 
-      String sql = "SELECT t1.results.sunrise AS sunrise, t1.results.sunset AS sunset FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1";
+      String sql = "SELECT sunrise, sunset FROM local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1";
 
       RowSet results = client.queryBuilder().sql(sql).rowSet();
 
@@ -322,7 +416,7 @@ public class TestHttpPlugin extends ClusterTest {
         .addRow("6:13:58 AM", "5:59:55 PM")
         .build();
 
-      new RowSetComparison(expected).verifyAndClearAll(results);
+      RowSetUtilities.verify(expected, results);
     }
   }
 
@@ -336,7 +430,7 @@ public class TestHttpPlugin extends ClusterTest {
           .throttleBody(64, 4, TimeUnit.SECONDS)
       );
 
-      String sql = "SELECT t1.results.sunrise AS sunrise, t1.results.sunset AS sunset FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1";
+      String sql = "SELECT sunrise AS sunrise, sunset AS sunset FROM local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1";
 
       try {
         client.queryBuilder().sql(sql).rowSet();
@@ -356,15 +450,16 @@ public class TestHttpPlugin extends ClusterTest {
           .setBody("")
       );
 
-      String sql = "SELECT * FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+      String sql = "SELECT * FROM local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
 
       RowSet results = client.queryBuilder().sql(sql).rowSet();
       assertNull(results);
     }
   }
 
-  // Note that, in this test, the response is not empty. Instead, the
-  // response has a single row with no columns.
+  // The connection expects a response object of the form
+  // { results: { ... } }, but there is no such object, which
+  // is treated as a null (no data, no schema) result set.
   @Test
   public void testEmptyJSONObjectResponse() throws Exception {
     try (MockWebServer server = startServer()) {
@@ -374,7 +469,41 @@ public class TestHttpPlugin extends ClusterTest {
           .setBody("{}")
       );
 
-      String sql = "SELECT * FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+      String sql = "SELECT * FROM local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+      assertNull(results);
+    }
+  }
+
+  @Test
+  public void testNullContent() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+        new MockResponse().setResponseCode(200)
+          .setBody("{results: null}")
+      );
+
+      String sql = "SELECT * FROM local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+      assertNull(results);
+    }
+  }
+
+  // Note that, in this test, the response is not empty. Instead, the
+  // response has a single row with no columns.
+  @Test
+  public void testEmptyContent() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+        new MockResponse().setResponseCode(200)
+          .setBody("{results: {} }")
+      );
+
+      String sql = "SELECT * FROM local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
 
       RowSet results = client.queryBuilder().sql(sql).rowSet();
 
@@ -385,7 +514,7 @@ public class TestHttpPlugin extends ClusterTest {
         .addRow()
         .build();
 
-      new RowSetComparison(expected).verifyAndClearAll(results);
+      RowSetUtilities.verify(expected, results);
     }
   }
 
@@ -398,13 +527,18 @@ public class TestHttpPlugin extends ClusterTest {
           .setBody("{}")
       );
 
-      String sql = "SELECT * FROM api.mock.`/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+      String sql = "SELECT * FROM local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
 
       try {
         client.queryBuilder().sql(sql).rowSet();
         fail();
       } catch (Exception e) {
-        assertTrue(e.getMessage().contains("DATA_READ ERROR: Error retrieving data from HTTP Storage Plugin: 404 Client Error"));
+        String msg = e.getMessage();
+        assertTrue(msg.contains("DATA_READ ERROR: HTTP request failed"));
+        assertTrue(msg.contains("Response code: 404"));
+        assertTrue(msg.contains("Response message: Client Error"));
+        assertTrue(msg.contains("Connection: sunrise"));
+        assertTrue(msg.contains("Plugin: local"));
       }
     }
   }
@@ -418,12 +552,10 @@ public class TestHttpPlugin extends ClusterTest {
           .setBody(TEST_JSON_RESPONSE)
       );
 
-      String sql = "SELECT * FROM api.mock.`json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+      String sql = "SELECT * FROM local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
       RowSet results = client.queryBuilder().sql(sql).rowSet();
 
-
       TupleMetadata expectedSchema = new SchemaBuilder()
-        .addMap("results")
         .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
         .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
         .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
@@ -434,18 +566,14 @@ public class TestHttpPlugin extends ClusterTest {
         .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
         .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
         .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
-        .resumeSchema()
-        .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
         .build();
 
       RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .addRow( mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
+        .addRow("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM",
+                "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM")
         .build();
 
-      int resultCount =  results.rowCount();
-      new RowSetComparison(expected).verifyAndClearAll(results);
-
-      assertEquals(1,  resultCount);
+      RowSetUtilities.verify(expected, results);
 
       RecordedRequest request = server.takeRequest();
       assertEquals("value1", request.getHeader("header1"));
@@ -455,18 +583,6 @@ public class TestHttpPlugin extends ClusterTest {
   }
 
   /**
-   * Helper function to convert files to a readable input steam.
-   * @param file The input file to be read
-   * @return A buffer to the file
-   * @throws IOException If the file is unreadable, throws an IOException
-   */
-  private Buffer fileToBytes(File file) throws IOException {
-    Buffer result = new Buffer();
-    result.writeAll(Okio.source(file));
-    return result;
-  }
-
-  /**
    * Helper function to start the MockHTTPServer
    * @return Started Mock server
    * @throws IOException If the server cannot start, throws IOException
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java
index 7c5656f..a699120 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java
@@ -20,6 +20,15 @@ package org.apache.drill.exec.physical.base;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+/**
+ * Cost estimate for a scan. In general, relative costs are more important
+ * than absolute costs. If a scan supports filter push-down, the cost of
+ * the scan after the push-down must be less than the combined cost of
+ * the scan + project before push down, else Calcite will ignore the
+ * push-down. Also, the estimated row count may influence whether the
+ * table can be broadcast or hash partitioned. Otherwise, Calcite has
+ * no real choices based on scan cost.
+ */
 public class ScanStats {
 
   public static final ScanStats TRIVIAL_TABLE = new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, 20, 1, 1);
@@ -30,8 +39,21 @@ public class ScanStats {
   private final GroupScanProperty groupScanProperty;
   @JsonProperty
   private final double recordCount;
+
+  /**
+   * CPU cost for the scan which should consider both row and column
+   * count, and the effect of filters. Considered only if the group scan property is
+   * set to {@link GroupScanProperty#ESTIMATED_TOTAL_COST FULL_COST}. Default
+   * CPU cost is simply row count * column count.
+   */
   @JsonProperty
   private final double cpuCost;
+
+  /**
+   * I/O cost for the scan. Considered only if the group scan property is
+   * set to {@link GroupScanProperty#ESTIMATED_TOTAL_COST FULL_COST}. Drill does not
+   * differentiate between network and disk I/O, despite the field name.
+   */
   @JsonProperty
   private final double diskCost;
 
@@ -82,7 +104,15 @@ public class ScanStats {
 
   public enum GroupScanProperty {
     NO_EXACT_ROW_COUNT(false, false),
-    EXACT_ROW_COUNT(true, true);
+    EXACT_ROW_COUNT(true, true),
+
+    /**
+     * Tells the planner to consider the full cost represented
+     * here. Else, the planner only looks at row count. However,
+     * we don't know the actual row count, a COUNT(*) query must
+     * still look at the input source if it wants an accurate count.
+     */
+    ESTIMATED_TOTAL_COST(false, true);
 
     private boolean hasExactRowCount, hasExactColumnValueCount;
 
@@ -98,5 +128,9 @@ public class ScanStats {
     public boolean hasExactColumnValueCount() {
       return hasExactColumnValueCount;
     }
+
+    public boolean hasFullCost() {
+      return this == ESTIMATED_TOTAL_COST;
+    }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnState.java
index 6d67ebc..cfa0b04 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnState.java
@@ -306,9 +306,15 @@ public abstract class ColumnState {
 
   public int innerCardinality() {
     ColumnMetadata schema = schema();
-    return schema.isArray()
-        ? cardinality * schema.expectedElementCount()
-        : cardinality;
+    if (schema.isArray()) {
+      // Multiply out the cardinality, but place reasonable limits:
+      // at least one row per inner array, limit by max row count to
+      // prevent cardinality explosions or deeply nested arrays.
+      int elementCount = Math.max(1, schema.expectedElementCount());
+      return Math.min(ValueVector.MAX_ROW_COUNT, cardinality * elementCount);
+    } else {
+      return cardinality;
+    }
   }
 
   public void buildOutput(TupleState tupleState) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java
index f64cd77..97ee688 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java
@@ -152,6 +152,7 @@ public class DrillCostBase implements DrillRelOptCost {
     return network;
   }
 
+  @Override
   public double getMemory() {
     return memory;
   }
@@ -307,29 +308,34 @@ public class DrillCostBase implements DrillRelOptCost {
       return new DrillCostBase(dRows, dCpu, dIo, dNetwork, dMemory);
     }
 
+    @Override
     public RelOptCost makeCost(double dRows, double dCpu, double dIo, double dNetwork) {
       return new DrillCostBase(dRows, dCpu, dIo, dNetwork, 0);
     }
 
+    @Override
     public RelOptCost makeCost(double dRows, double dCpu, double dIo) {
       return new DrillCostBase(dRows, dCpu, dIo, 0, 0);
     }
 
+    @Override
     public RelOptCost makeHugeCost() {
       return DrillCostBase.HUGE;
     }
 
+    @Override
     public RelOptCost makeInfiniteCost() {
       return DrillCostBase.INFINITY;
     }
 
+    @Override
     public RelOptCost makeTinyCost() {
       return DrillCostBase.TINY;
     }
 
+    @Override
     public RelOptCost makeZeroCost() {
       return DrillCostBase.ZERO;
     }
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
index d38210d..e11dad2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
@@ -29,7 +29,6 @@ import org.apache.drill.common.logical.data.Scan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.planner.common.DrillScanRelBase;
-import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.planner.torel.ConversionContext;
@@ -47,18 +46,18 @@ import org.apache.drill.exec.util.Utilities;
  * GroupScan of a Drill table.
  */
 public class DrillScanRel extends DrillScanRelBase implements DrillRel {
-  private final static int STAR_COLUMN_COST = 10000;
+  public static final int STAR_COLUMN_COST = 10_000;
+
   private PlannerSettings settings;
-  private List<SchemaPath> columns;
+  private final List<SchemaPath> columns;
   private final boolean partitionFilterPushdown;
-  final private RelDataType rowType;
+  private final RelDataType rowType;
 
-  /** Creates a DrillScan. */
   public DrillScanRel(final RelOptCluster cluster, final RelTraitSet traits,
                       final RelOptTable table) {
     this(cluster, traits, table, false);
   }
-  /** Creates a DrillScan. */
+
   public DrillScanRel(final RelOptCluster cluster, final RelTraitSet traits,
                       final RelOptTable table, boolean partitionFilterPushdown) {
     // By default, scan does not support project pushdown.
@@ -67,13 +66,11 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
     this.settings = PrelUtil.getPlannerSettings(cluster.getPlanner());
   }
 
-  /** Creates a DrillScan. */
   public DrillScanRel(final RelOptCluster cluster, final RelTraitSet traits,
                       final RelOptTable table, final RelDataType rowType, final List<SchemaPath> columns) {
     this(cluster, traits, table, rowType, columns, false);
   }
 
-  /** Creates a DrillScan. */
   public DrillScanRel(final RelOptCluster cluster, final RelTraitSet traits,
                       final RelOptTable table, final RelDataType rowType, final List<SchemaPath> columns, boolean partitionFilterPushdown) {
     super(cluster, traits, table, columns);
@@ -100,15 +97,6 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
     this.partitionFilterPushdown = partitionFilterPushdown;
   }
 
-//
-//  private static GroupScan getCopy(GroupScan scan){
-//    try {
-//      return (GroupScan) scan.getNewWithChildren((List<PhysicalOperator>) (Object) Collections.emptyList());
-//    } catch (ExecutionSetupException e) {
-//      throw new DrillRuntimeException("Unexpected failure while coping node.", e);
-//    }
-//  }
-
   public List<SchemaPath> getColumns() {
     return this.columns;
   }
@@ -142,34 +130,45 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
     return getGroupScan().getScanStats(settings).getRecordCount();
   }
 
-  /// TODO: this method is same as the one for ScanPrel...eventually we should consolidate
-  /// this and few other methods in a common base class which would be extended
-  /// by both logical and physical rels.
+  // TODO: this method is same as the one for ScanPrel...eventually we should consolidate
+  // this and few other methods in a common base class which would be extended
+  // by both logical and physical rels.
+  // TODO: Further changes may have caused the versions to diverge.
+  // TODO: Does not compute IO cost by default, but should. Changing that may break
+  // existing plugins.
   @Override
   public RelOptCost computeSelfCost(final RelOptPlanner planner, RelMetadataQuery mq) {
     final ScanStats stats = getGroupScan().getScanStats(settings);
-    int columnCount = getRowType().getFieldCount();
-    double ioCost = 0;
-    boolean isStarQuery = Utilities.isStarQuery(columns);
-
-    if (isStarQuery) {
-      columnCount = STAR_COLUMN_COST;
-    }
+    int columnCount = Utilities.isStarQuery(columns) ? STAR_COLUMN_COST : getRowType().getFieldCount();
 
     // double rowCount = RelMetadataQuery.getRowCount(this);
-    double rowCount = stats.getRecordCount();
-    if (rowCount < 1) {
-      rowCount = 1;
+    double rowCount = Math.max(1, stats.getRecordCount());
+
+    double valueCount = rowCount * columnCount;
+    if (PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+      // TODO: makeCost() wants a row count, but we provide a value count.
+      // Likely a bug, but too risky to change as it may affect existing plugins.
+      // If we do make the fix, then the default costing path is the same as the
+      // full cost path.
+      // TODO: At this late date, with many plugins exploiting (if only by
+      // accident) the default costing here, it is not clear if we even want
+      // the planner to control the cost model. That is, remove this path.
+      return planner.getCostFactory().makeCost(valueCount, stats.getCpuCost(), stats.getDiskCost());
     }
 
-    if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
-      return planner.getCostFactory().makeCost(rowCount * columnCount, stats.getCpuCost(), stats.getDiskCost());
+    double cpuCost;
+    double ioCost;
+    if (stats.getGroupScanProperty().hasFullCost()) {
+      cpuCost = stats.getCpuCost();
+      ioCost = stats.getDiskCost();
+    } else {
+      // for now, assume cpu cost is proportional to row count and number of columns
+      cpuCost = valueCount;
+
+      // Default io cost should be proportional to valueCount
+      ioCost = 0;
     }
-
-    double cpuCost = rowCount * columnCount; // for now, assume cpu cost is proportional to row count and number of columns
-
-    DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
-    return costFactory.makeCost(rowCount, cpuCost, ioCost, 0);
+    return planner.getCostFactory().makeCost(rowCount, cpuCost, ioCost);
   }
 
   public boolean partitionFilterPushdown() {
@@ -192,5 +191,4 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
 
     return projectedColumns;
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchemaFactory.java
index ab22b76..9e45545 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchemaFactory.java
@@ -32,5 +32,4 @@ public abstract class AbstractSchemaFactory implements SchemaFactory {
   public String getName() {
     return name;
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index b434750..69ddfc1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -141,6 +141,7 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
     private CustomErrorContext errorContext;
     private InputStream stream;
     private Reader reader;
+    private String dataPath;
     private MessageParser messageParser;
 
     public JsonLoaderBuilder resultSetLoader(ResultSetLoader rsLoader) {
@@ -183,6 +184,11 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
       return this;
     }
 
+    public JsonLoaderBuilder dataPath(String dataPath) {
+      this.dataPath = dataPath;
+      return this;
+    }
+
     public JsonLoader build() {
       // Defaults, primarily for testing.
       if (options == null) {
@@ -230,6 +236,7 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
             .rootListener(rowListener)
             .errorFactory(this)
             .messageParser(builder.messageParser)
+            .dataPath(builder.dataPath)
             .build();
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
index 5c4425d..23ff3e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
@@ -22,7 +22,8 @@ import java.io.InputStream;
 import java.io.Reader;
 
 import org.apache.drill.exec.store.easy.json.parser.MessageParser.MessageContextException;
-import org.apache.drill.exec.store.easy.json.parser.RootParser.NestedRootArrayParser;
+import org.apache.drill.exec.store.easy.json.parser.RootParser.EmbeddedArrayParser;
+import org.apache.drill.exec.store.easy.json.parser.RootParser.EmbeddedObjectParser;
 import org.apache.drill.exec.store.easy.json.parser.RootParser.RootArrayParser;
 import org.apache.drill.exec.store.easy.json.parser.RootParser.RootObjectParser;
 import org.apache.drill.exec.store.easy.json.parser.TokenIterator.RecoverableJsonException;
@@ -38,7 +39,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 /**
  * Parser for a subset of the <a href="http://jsonlines.org/">jsonlines</a>
  * format. In particular, supports line-delimited JSON objects, or a single
- * array which holds a list of JSON objects.
+ * array which holds a list of JSON objects. Although jsonlines requires
+ * a newline separator between objects, this parser is more relaxed: it
+ * allows any whitespace, or no whitespace at all. It simply looks for the
+ * pattern <code>{ ... } { ... }</code> with reading top-level objects.
  * <p>
  * Alternatively, a message parser can provide a path to an array of JSON
  * objects within a messages such as a REST response.
@@ -164,6 +168,10 @@ public class JsonStructureParser {
       throw errorFactory().ioException(e);
     }
     tokenizer = new TokenIterator(parser, options, errorFactory());
+
+    // Parse to the start of the data object(s), and create a root
+    // state to parse objects and watch for the end of data.
+    // The root state parses one object on each next() call.
     if (builder.messageParser == null) {
       rootState = makeRootState();
     } else {
@@ -175,17 +183,6 @@ public class JsonStructureParser {
   public ErrorFactory errorFactory() { return errorFactory; }
   public ObjectListener rootListener() { return rootListener; }
 
-  private RootParser makeCustomRoot(MessageParser messageParser) {
-    try {
-      if (! messageParser.parsePrefix(tokenizer)) {
-        return null;
-      }
-    } catch (MessageContextException e) {
-      throw errorFactory.messageParseError(e);
-    }
-    return new NestedRootArrayParser(this, messageParser);
-  }
-
   private RootParser makeRootState() {
     JsonToken token = tokenizer.next();
     if (token == null) {
@@ -215,6 +212,29 @@ public class JsonStructureParser {
     }
   }
 
+  private RootParser makeCustomRoot(MessageParser messageParser) {
+    try {
+      if (!messageParser.parsePrefix(tokenizer)) {
+        return null;
+      }
+    } catch (MessageContextException e) {
+      throw errorFactory.messageParseError(e);
+    }
+    JsonToken token = tokenizer.requireNext();
+    switch (token) {
+      case VALUE_NULL:
+        // If the value is null, just treat it as no data.
+        return null;
+      case START_ARRAY:
+        return new EmbeddedArrayParser(this, messageParser);
+      case START_OBJECT:
+        tokenizer.unget(token);
+        return new EmbeddedObjectParser(this, messageParser);
+      default:
+        throw new IllegalStateException("Message parser misbehaved: " + token.name());
+    }
+  }
+
   public boolean next() {
     if (rootState == null) {
       // Only occurs for an empty document
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/MessageParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/MessageParser.java
index 5b02f97..f358f63 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/MessageParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/MessageParser.java
@@ -38,6 +38,7 @@ public interface MessageParser {
       this.nextElement = nextElement;
     }
   }
+
   boolean parsePrefix(TokenIterator tokenizer) throws MessageContextException;
   void parseSuffix(TokenIterator tokenizer) throws MessageContextException;
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/RootParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/RootParser.java
index 32460a2..1252c3e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/RootParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/RootParser.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.easy.json.parser;
 
 import org.apache.drill.exec.store.easy.json.parser.MessageParser.MessageContextException;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -26,8 +27,11 @@ import com.fasterxml.jackson.core.JsonToken;
 /**
  * The root parsers are special: they must detect EOF. Drill supports
  * top-level objects either enclosed in an array (which forms legal
- * JSON), or as a series JSON objects (which is a common, if not
- * entirely legal, form of JSON.)
+ * JSON), or as the
+ * <a href="http://jsonlines.org/">jsonlines</a> format, restricted
+ * to a list of objects (but not scalars or arrays.) Although jsonlines
+ * requires newline separators between objects, this parser allows
+ * any amount of whitespace, including none.
  */
 public abstract class RootParser implements ElementParser {
   protected static final Logger logger = LoggerFactory.getLogger(RootParser.class);
@@ -40,13 +44,32 @@ public abstract class RootParser implements ElementParser {
     this.rootObject = new ObjectParser(this, structParser.rootListener());
   }
 
+  /**
+   * Parse one data object. This is the "root" object which may contain
+   * nested objects. Overridden to handle different end-of-data indicators
+   * for different contexts.
+   *
+   * @return {@code true} if an object was found, {@code false} if the
+   * end of data was reached.
+   */
   public abstract boolean parseRoot(TokenIterator tokenizer);
 
+  // Generic parsing not allowed at the root since the root must
+  // report EOF. Use parseRoot() instead.
   @Override
   public void parse(TokenIterator tokenizer) {
-    throw new UnsupportedOperationException();
+    throw new UnsupportedOperationException(
+        "Call parseRoot() at the root level to check for EOF.");
   }
 
+  /**
+   * Parse one data object. This is the "root" object which may contain
+   * nested objects. Called when the outer parser detects a start
+   * object token for a data object.
+   *
+   * @return {@code true} if an object was found, {@code false} if the
+   * end of data was reached.
+   */
   protected boolean parseRootObject(JsonToken token, TokenIterator tokenizer) {
     // Position: ^ ?
     switch (token) {
@@ -77,6 +100,11 @@ public abstract class RootParser implements ElementParser {
   @Override
   public JsonStructureParser structParser() { return structParser; }
 
+  /**
+   * Parser for a <a href="http://jsonlines.org/">jsonlines</a>-style
+   * data set which consists of a series of objects. EOF from the parser
+   * indicates the end of the data set.
+   */
   public static class RootObjectParser extends RootParser {
 
     public RootObjectParser(JsonStructureParser structParser) {
@@ -89,12 +117,20 @@ public abstract class RootParser implements ElementParser {
       if (token == null) {
         // Position: EOF ^
         return false;
-      } else {
+      } else if (token == JsonToken.START_OBJECT) {
         return parseRootObject(token, tokenizer);
+      } else {
+        throw errorFactory().syntaxError(token); // Nothing else is valid
       }
     }
   }
 
+  /**
+   * Parser for a compliant JSON data set which consists of an
+   * array at the top level, where each element of the array is a
+   * JSON object that represents a data record. A closing array
+   * bracket indicates end of data.
+   */
   public static class RootArrayParser extends RootParser {
 
     public RootArrayParser(JsonStructureParser structParser) {
@@ -112,43 +148,96 @@ public abstract class RootParser implements ElementParser {
         logger.warn("Failed to close outer array. {}",
             tokenizer.context());
         return false;
-      } else if (token == JsonToken.END_ARRAY) {
-        return false;
-      } else {
-        return parseRootObject(token, tokenizer);
+      }
+      switch (token) {
+        case END_ARRAY:
+          return false;
+        case START_OBJECT:
+          return parseRootObject(token, tokenizer);
+        default:
+          throw errorFactory().syntaxError(token); // Nothing else is valid
       }
     }
   }
 
-  public static class NestedRootArrayParser extends RootParser {
+  /**
+   * Parser for data embedded within a message structure which is
+   * encoded as an array of objects. Example:
+   * <pre><code>
+   * { status: "ok", results: [ { ... }, { ... } ] }</code></pre>
+   * <p>
+   * The closing array bracket indicates the end of data; the
+   * message parser will parse any content after the closing
+   * bracket.
+   */
+  public static class EmbeddedArrayParser extends RootParser {
 
     private final MessageParser messageParser;
 
-    public NestedRootArrayParser(JsonStructureParser structParser, MessageParser messageParser) {
+    public EmbeddedArrayParser(JsonStructureParser structParser, MessageParser messageParser) {
       super(structParser);
       this.messageParser = messageParser;
     }
 
     @Override
     public boolean parseRoot(TokenIterator tokenizer) {
-      JsonToken token = tokenizer.next();
-      if (token == null) {
-        // Position: { ... EOF ^
-        // Saw EOF, but no closing ]. Warn and ignore.
-        // Note that the Jackson parser won't let us get here;
-        // it will have already thrown a syntax error.
-        logger.warn("Failed to close outer array. {}",
-            tokenizer.context());
+      JsonToken token = tokenizer.requireNext();
+      switch (token) {
+        case END_ARRAY:
+          break;
+        case START_OBJECT:
+          return parseRootObject(token, tokenizer);
+        default:
+          throw errorFactory().syntaxError(token); // Nothing else is valid
+      }
+
+      // Parse the trailing message content.
+      try {
+        messageParser.parseSuffix(tokenizer);
         return false;
-      } else if (token == JsonToken.END_ARRAY) {
-        try {
-          messageParser.parseSuffix(tokenizer);
-        } catch (MessageContextException e) {
-          throw errorFactory().messageParseError(e);
+      } catch (MessageContextException e) {
+        throw errorFactory().messageParseError(e);
+      }
+    }
+  }
+
+  /**
+   * Parser for data embedded within a message structure which is encoded
+   * as a single JSON object. Example:
+   * <pre><code>
+   * { status: "ok", results: { ... } }</code></pre>
+   * <p>
+   * The parser counts over the single object. The
+   * message parser will parse any content after the closing
+   * bracket.
+   */
+  public static class EmbeddedObjectParser extends RootParser {
+
+    private final MessageParser messageParser;
+    private int objectCount;
+
+    public EmbeddedObjectParser(JsonStructureParser structParser, MessageParser messageParser) {
+      super(structParser);
+      this.messageParser = messageParser;
+    }
+
+    @Override
+    public boolean parseRoot(TokenIterator tokenizer) {
+      Preconditions.checkState(objectCount <= 1);
+      if (objectCount == 0) {
+        objectCount++;
+        JsonToken token = tokenizer.requireNext();
+        if (token == JsonToken.START_OBJECT) {
+          return parseRootObject(token, tokenizer);
+        } else {
+          throw errorFactory().syntaxError(token); // Nothing else is valid
         }
+      }
+      try {
+        messageParser.parseSuffix(tokenizer);
         return false;
-      } else {
-        return parseRootObject(token, tokenizer);
+      } catch (MessageContextException e) {
+        throw errorFactory().messageParseError(e);
       }
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java
index 956b910..a5181f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java
@@ -25,7 +25,7 @@ import com.fasterxml.jackson.core.JsonToken;
  * A message parser which accepts a path to the data encoded as a
  * slash-separated string. Given the following JSON message:
  *
- * <pre><code:
+ * <pre><code>
  * { status: {
  *     succeeded: true,
  *     runTimeMs: 123,
@@ -49,6 +49,19 @@ import com.fasterxml.jackson.core.JsonToken;
  * If the data path is not found then this class reports EOF of
  * the whole data stream. It may have skipped over the actual payload
  * if the path is mis-configured.
+ * <p>
+ * The payload can also be a single JSON object:
+ * <pre><code>
+ *   response: {
+ *     field1: "value1",
+ *     field2: "value2",
+ *     ...
+ *     },
+ * </code></pre>
+ * <p>
+ * This parser "ungets" the value token (start object or start
+ * array) so that the structure parser can determine which case
+ * to handle.
  */
 public class SimpleMessageParser implements MessageParser {
 
@@ -99,13 +112,14 @@ public class SimpleMessageParser implements MessageParser {
     JsonToken token = tokenizer.requireNext();
     if (level == path.length - 1) {
       switch (token) {
-      case VALUE_NULL:
-        return false;
-      case START_ARRAY:
-        return true;
-      default:
-        throw new MessageContextException(token,
-            path[level], "Expected JSON array for final path element");
+        case VALUE_NULL:
+        case START_ARRAY:
+        case START_OBJECT:
+          tokenizer.unget(token);
+          return true;
+        default:
+          throw new MessageContextException(token,
+              path[level], "Expected JSON array for final path element");
       }
     }
     if (token != JsonToken.START_OBJECT) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java
index 8e75109..4bcf3f0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java
@@ -329,6 +329,16 @@ public class BaseTestJsonParser {
     fixture.close();
   }
 
+  protected static void expectOpenError(String json, String kind) {
+    JsonParserFixture fixture = new JsonParserFixture();
+    try {
+      fixture.open(json);
+      fail();
+    } catch (JsonErrorFixture e) {
+      assertEquals(kind, e.errorType);
+    }
+  }
+
   protected static void expectError(JsonParserFixture fixture, String kind) {
     try {
       fixture.read();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserErrors.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserErrors.java
index 2693635..93e2327 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserErrors.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserErrors.java
@@ -26,7 +26,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 /**
- * Tests the un-happy path cases in the JSON structure parser. Some
+ * Tests the unhappy path cases in the JSON structure parser. Some
  * error cases can't occur because the Jackson parser catches them
  * first.
  */
@@ -63,6 +63,26 @@ public class TestJsonParserErrors extends BaseTestJsonParser {
   }
 
   @Test
+  public void testRootScalar() {
+    expectOpenError("10", "syntaxError");
+  }
+
+  // Should we treat a root null as an empty result set?
+  @Test
+  public void testRootNull() {
+    expectOpenError("null", "syntaxError");
+  }
+
+  @Test
+  public void testRootArrayScalar() {
+    expectError("[ {a: \"ok\" }, 10 ]", "syntaxError");
+  }
+  @Test
+  public void testRootArrayNull() {
+    expectError("[ {a: \"ok\" }, null ]", "syntaxError");
+  }
+
+  @Test
   public void testRootArrayDisallowed() {
     final String json = "[{a: 0}, {a: 100}, {a: null}]";
     JsonParserFixture fixture = new JsonParserFixture();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserMessage.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserMessage.java
index 4905d1f..8f09bbc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserMessage.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserMessage.java
@@ -44,7 +44,9 @@ public class TestJsonParserMessage extends BaseTestJsonParser {
         return false;
       }
       assertEquals(JsonToken.FIELD_NAME, tokenizer.requireNext());
-      assertEquals(JsonToken.START_ARRAY, tokenizer.requireNext());
+      JsonToken token = tokenizer.requireNext();
+      assertEquals(JsonToken.START_ARRAY, token);
+      tokenizer.unget(token);
       return true;
     }
 
@@ -65,10 +67,9 @@ public class TestJsonParserMessage extends BaseTestJsonParser {
     JsonParserFixture fixture = new JsonParserFixture();
     fixture.builder.messageParser(new MessageParserFixture());
     fixture.open(json);
-    assertTrue(fixture.next());
+    assertEquals(3, fixture.read());
     ValueListenerFixture a = fixture.field("a");
     assertEquals(JsonType.INTEGER, a.valueDef.type());
-    assertEquals(2, fixture.read());
     assertEquals(1, a.nullCount);
     assertEquals(100L, a.value);
     fixture.close();
@@ -89,17 +90,40 @@ public class TestJsonParserMessage extends BaseTestJsonParser {
     fixture.close();
   }
 
+  /**
+   * Test the case where the returned message has a single data
+   * object: <code>{ data: { ... } }</code>.
+   */
   @Test
-  public void testDataPath() {
+  public void testDataPathObject() {
     final String json =
-        "{ status: \"ok\", data: [{a: 0}, {a: 100}, {a: null}]}";
+        "{ status: \"ok\", data: {a: 100}}";
     JsonParserFixture fixture = new JsonParserFixture();
     fixture.builder.dataPath("data");
     fixture.open(json);
     assertTrue(fixture.next());
     ValueListenerFixture a = fixture.field("a");
     assertEquals(JsonType.INTEGER, a.valueDef.type());
-    assertEquals(2, fixture.read());
+    assertEquals(0, a.nullCount);
+    assertEquals(100L, a.value);
+    assertFalse(fixture.next());
+    fixture.close();
+  }
+
+  /**
+   * Test the case where the returned message has an array
+   * objects: <code>{ data: [ { ... }, { ... } ... ] }</code>.
+   */
+  @Test
+  public void testDataPathArray() {
+    final String json =
+        "{ status: \"ok\", data: [{a: 0}, {a: 100}, {a: null}]}";
+    JsonParserFixture fixture = new JsonParserFixture();
+    fixture.builder.dataPath("data");
+    fixture.open(json);
+    assertEquals(3, fixture.read());
+    ValueListenerFixture a = fixture.field("a");
+    assertEquals(JsonType.INTEGER, a.valueDef.type());
     assertEquals(1, a.nullCount);
     assertEquals(100L, a.value);
     fixture.close();
@@ -124,12 +148,18 @@ public class TestJsonParserMessage extends BaseTestJsonParser {
     fixture.close();
   }
 
+  /**
+   * Test the case where the returned message has a null in place
+   * of the data: <code>{ data: null }</code>. This is harmlessly
+   * treated as no data and is needed for the case where the
+   * message normally returns a single object.
+   */
   @Test
   public void testDataPathNull() {
     final String json =
         "{ status: \"fail\", data: null}";
     JsonParserFixture fixture = new JsonParserFixture();
-    fixture.builder.messageParser(new MessageParserFixture());
+    fixture.builder.dataPath("data");
     fixture.open(json);
     assertFalse(fixture.next());
     fixture.close();
@@ -160,20 +190,4 @@ public class TestJsonParserMessage extends BaseTestJsonParser {
     }
     fixture.close();
   }
-
-  @Test
-  public void testDataPathErrorLeaf() {
-    final String json =
-        "{ status: \"bogus\", data: { notValid: \"must be array\"}}";
-    JsonParserFixture fixture = new JsonParserFixture();
-    fixture.builder.dataPath("data");
-    try {
-      fixture.open(json);
-      fail();
-    } catch (JsonErrorFixture e) {
-      assertTrue(e.errorType.equals("messageParseError"));
-      assertTrue(e.getCause() instanceof MessageParser.MessageContextException);
-    }
-    fixture.close();
-  }
 }