You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by se...@apache.org on 2018/11/16 17:06:00 UTC

calcite git commit: [CALCITE-2651] Enable scrolling in ElasticSearch adapter for basic search queries

Repository: calcite
Updated Branches:
  refs/heads/master fcc8bf7f4 -> 614bc7b2b


[CALCITE-2651] Enable scrolling in ElasticSearch adapter for basic search queries

The only efficient way to fetch full (large) result set in elastic is to use scrolling.

Note that behaviour remains unchanged for queries with aggregates and offsets
(like explicit pagination using `from`).


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/614bc7b2
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/614bc7b2
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/614bc7b2

Branch: refs/heads/master
Commit: 614bc7b2bffce28b26d7c52f12a670f4a13f264a
Parents: fcc8bf7
Author: Andrei Sereda <25...@users.noreply.github.com>
Authored: Wed Oct 31 22:35:33 2018 -0400
Committer: Andrei Sereda <25...@users.noreply.github.com>
Committed: Thu Nov 15 22:41:59 2018 -0500

----------------------------------------------------------------------
 elasticsearch/pom.xml                           |   5 +
 .../elasticsearch/ElasticsearchJson.java        |  13 +-
 .../elasticsearch/ElasticsearchSchema.java      |  24 +-
 .../elasticsearch/ElasticsearchTable.java       | 100 ++-----
 .../elasticsearch/ElasticsearchTransport.java   | 285 +++++++++++++++++++
 .../adapter/elasticsearch/Scrolling.java        | 176 ++++++++++++
 .../elasticsearch/ElasticSearchAdapterTest.java |  48 +++-
 .../EmbeddedElasticsearchPolicy.java            |  10 +-
 .../adapter/elasticsearch/ScrollingTest.java    | 125 ++++++++
 9 files changed, 688 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/614bc7b2/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml
index 4700fee..ec9a0e0 100644
--- a/elasticsearch/pom.xml
+++ b/elasticsearch/pom.xml
@@ -87,6 +87,11 @@ limitations under the License.
       <version>${httpcore.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>${httpclient.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.elasticsearch</groupId>
       <artifactId>elasticsearch</artifactId>
       <version>${elasticsearch.version}</version>

http://git-wip-us.apache.org/repos/asf/calcite/blob/614bc7b2/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
index a05d2a4..8a6b011 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
@@ -41,6 +41,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
@@ -55,7 +56,7 @@ import static java.util.Collections.unmodifiableMap;
  * <p>Since we're using basic row-level rest client http response has to be
  * processed manually using JSON (jackson) library.
  */
-class ElasticsearchJson {
+final class ElasticsearchJson {
 
   /**
    * Used as special aggregation key for missing values (documents which are missing a field).
@@ -162,6 +163,7 @@ class ElasticsearchJson {
   static class Result {
     private final SearchHits hits;
     private final Aggregations aggregations;
+    private final String scrollId;
     private final long took;
 
     /**
@@ -172,9 +174,11 @@ class ElasticsearchJson {
     @JsonCreator
     Result(@JsonProperty("hits") SearchHits hits,
         @JsonProperty("aggregations") Aggregations aggregations,
+        @JsonProperty("_scroll_id") String scrollId,
         @JsonProperty("took") long took) {
       this.hits = Objects.requireNonNull(hits, "hits");
       this.aggregations = aggregations;
+      this.scrollId = scrollId;
       this.took = took;
     }
 
@@ -186,10 +190,14 @@ class ElasticsearchJson {
       return aggregations;
     }
 
-    public Duration took() {
+    Duration took() {
       return Duration.ofMillis(took);
     }
 
+    Optional<String> scrollId() {
+      return Optional.ofNullable(scrollId);
+    }
+
   }
 
   /**
@@ -625,6 +633,7 @@ class ElasticsearchJson {
     }
 
   }
+
 }
 
 // End ElasticsearchJson.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/614bc7b2/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
index 80a94be..888593b 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
@@ -21,6 +21,8 @@ import org.apache.calcite.schema.impl.AbstractSchema;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 
@@ -52,6 +54,11 @@ public class ElasticsearchSchema extends AbstractSchema {
   private final Map<String, Table> tableMap;
 
   /**
+   * Default batch size to be used during scrolling.
+   */
+  private final int fetchSize;
+
+  /**
    * Allows schema to be instantiated from existing elastic search client.
    * This constructor is used in tests.
    * @param client existing client instance
@@ -63,10 +70,20 @@ public class ElasticsearchSchema extends AbstractSchema {
   }
 
   public ElasticsearchSchema(RestClient client, ObjectMapper mapper, String index, String type) {
+    this(client, mapper, index, type, ElasticsearchTransport.DEFAULT_FETCH_SIZE);
+  }
+
+  @VisibleForTesting
+  ElasticsearchSchema(RestClient client, ObjectMapper mapper,
+                      String index, String type,
+                      int fetchSize) {
     super();
     this.client = Objects.requireNonNull(client, "client");
     this.mapper = Objects.requireNonNull(mapper, "mapper");
     this.index = Objects.requireNonNull(index, "index");
+    Preconditions.checkArgument(fetchSize > 0,
+        "invalid fetch size. Expected %s > 0", fetchSize);
+    this.fetchSize = fetchSize;
     if (type == null) {
       try {
         this.tableMap = createTables(listTypesFromElastic());
@@ -76,6 +93,7 @@ public class ElasticsearchSchema extends AbstractSchema {
     } else {
       this.tableMap = createTables(Collections.singleton(type));
     }
+
   }
 
   @Override protected Map<String, Table> getTableMap() {
@@ -85,7 +103,9 @@ public class ElasticsearchSchema extends AbstractSchema {
   private Map<String, Table> createTables(Iterable<String> types) {
     final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
     for (String type : types) {
-      builder.put(type, new ElasticsearchTable(client, mapper, index, type));
+      final ElasticsearchTransport transport = new ElasticsearchTransport(client, mapper,
+          index, type, fetchSize);
+      builder.put(type, new ElasticsearchTable(transport));
     }
     return builder.build();
   }
@@ -101,7 +121,7 @@ public class ElasticsearchSchema extends AbstractSchema {
     final String endpoint = "/" + index + "/_mapping";
     final Response response = client.performRequest("GET", endpoint);
     try (InputStream is = response.getEntity().getContent()) {
-      JsonNode root = mapper.readTree(is);
+      final JsonNode root = mapper.readTree(is);
       if (!root.isObject() || root.size() != 1) {
         final String message = String.format(Locale.ROOT, "Invalid response for %s/%s "
             + "Expected object of size 1 got %s (of size %d)", response.getHost(),

http://git-wip-us.apache.org/repos/asf/calcite/blob/614bc7b2/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
index 05f3136..f9565ec 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
@@ -29,30 +29,20 @@ import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.runtime.Hook;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.schema.impl.AbstractTableQueryable;
 import org.apache.calcite.sql.type.SqlTypeName;
 
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpStatus;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.util.EntityUtils;
-
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
-import org.elasticsearch.client.Response;
-import org.elasticsearch.client.RestClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -60,7 +50,6 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -79,50 +68,22 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl
    */
   private static final String AGGREGATIONS = "aggregations";
 
-  private final RestClient restClient;
   private final ElasticsearchVersion version;
   private final String indexName;
   private final String typeName;
   final ObjectMapper mapper;
+  private final ElasticsearchTransport transport;
 
   /**
    * Creates an ElasticsearchTable.
-   * @param client low-level ES rest client
-   * @param mapper Jackson API
-   * @param indexName elastic search index
-   * @param typeName elastic searh index type
    */
-  ElasticsearchTable(RestClient client, ObjectMapper mapper, String indexName, String typeName) {
+  ElasticsearchTable(ElasticsearchTransport transport) {
     super(Object[].class);
-    this.restClient = Objects.requireNonNull(client, "client");
-    try {
-      this.version = detectVersion(client, mapper);
-    } catch (IOException e) {
-      final String message = String.format(Locale.ROOT, "Couldn't detect ES version "
-          + "for %s/%s", indexName, typeName);
-      throw new UncheckedIOException(message, e);
-    }
-    this.indexName = Objects.requireNonNull(indexName, "indexName");
-    this.typeName = Objects.requireNonNull(typeName, "typeName");
-    this.mapper = Objects.requireNonNull(mapper, "mapper");
-
-  }
-
-  /**
-   * Detects current Elastic Search version by connecting to a existing instance.
-   * It is a {@code GET} request to {@code /}. Returned JSON has server information
-   * (including version).
-   *
-   * @param client low-level rest client connected to ES instance
-   * @param mapper Jackson mapper instance used to parse responses
-   * @return parsed version from ES, or {@link ElasticsearchVersion#UNKNOWN}
-   * @throws IOException if couldn't connect to ES
-   */
-  private static ElasticsearchVersion detectVersion(RestClient client, ObjectMapper mapper)
-      throws IOException {
-    HttpEntity entity = client.performRequest("GET", "/").getEntity();
-    JsonNode node = mapper.readTree(EntityUtils.toString(entity));
-    return ElasticsearchVersion.fromString(node.get("version").get("number").asText());
+    this.transport = Objects.requireNonNull(transport, "transport");
+    this.version = transport.version;
+    this.indexName = transport.indexName;
+    this.typeName = transport.typeName;
+    this.mapper = transport.mapper();
   }
 
   /**
@@ -154,7 +115,7 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl
    * @param aggregations aggregation functions
    * @return Enumerator of results
    */
-  protected Enumerable<Object> find(List<String> ops,
+  private Enumerable<Object> find(List<String> ops,
       List<Map.Entry<String, Class>> fields,
       List<Map.Entry<String, RelFieldCollation.Direction>> sort,
       List<String> groupBy,
@@ -188,14 +149,19 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl
       query.put("size", fetch);
     }
 
-    try {
-      ElasticsearchJson.Result search = httpRequest(query);
-      final Function1<ElasticsearchJson.SearchHit, Object> getter =
-          ElasticsearchEnumerators.getter(fields);
-      return Linq4j.asEnumerable(search.searchHits().hits()).select(getter);
-    } catch (IOException e) {
-      throw new UncheckedIOException(e);
+    final Function1<ElasticsearchJson.SearchHit, Object> getter =
+        ElasticsearchEnumerators.getter(fields);
+
+    Iterable<ElasticsearchJson.SearchHit> iter;
+    if (offset == null) {
+      // apply scrolling when there is no offsets
+      iter = () -> new Scrolling(transport).query(query);
+    } else {
+      final ElasticsearchJson.Result search = transport.search().apply(query);
+      iter = () -> search.searchHits().hits().iterator();
     }
+
+    return Linq4j.asEnumerable(iter).select(getter);
   }
 
   private Enumerable<Object> aggregate(List<String> ops,
@@ -281,7 +247,7 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl
     }
     ((ObjectNode) agg).remove(AGGREGATIONS);
 
-    ElasticsearchJson.Result res = httpRequest(query);
+    ElasticsearchJson.Result res = transport.search(Collections.emptyMap()).apply(query);
 
     final List<Map<String, Object>> result = new ArrayList<>();
     if (res.aggregations() != null) {
@@ -320,30 +286,6 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl
     return Linq4j.asEnumerable(hits.hits()).select(getter);
   }
 
-  private ElasticsearchJson.Result httpRequest(ObjectNode query) throws IOException {
-    Objects.requireNonNull(query, "query");
-    String uri = String.format(Locale.ROOT, "/%s/%s/_search", indexName, typeName);
-
-    Hook.QUERY_PLAN.run(query);
-    final String json = mapper.writeValueAsString(query);
-
-    LOGGER.debug("Elasticsearch Query: {}", json);
-
-    HttpEntity entity = new StringEntity(json, ContentType.APPLICATION_JSON);
-    Response response = restClient.performRequest("POST", uri, Collections.emptyMap(), entity);
-    if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
-      final String error = EntityUtils.toString(response.getEntity());
-      final String message = String.format(Locale.ROOT,
-          "Error while querying Elastic (on %s/%s) status: %s\nQuery:\n%s\nError:\n%s\n",
-          response.getHost(), response.getRequestLine(), response.getStatusLine(), query, error);
-      throw new RuntimeException(message);
-    }
-
-    try (InputStream is = response.getEntity().getContent()) {
-      return mapper.readValue(is, ElasticsearchJson.Result.class);
-    }
-  }
-
   @Override public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
     final RelDataType mapType = relDataTypeFactory.createMapType(
         relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR),

http://git-wip-us.apache.org/repos/asf/calcite/blob/614bc7b2/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTransport.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTransport.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTransport.java
new file mode 100644
index 0000000..12173d8
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTransport.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.runtime.Hook;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpEntityEnclosingRequest;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.util.EntityUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.StreamSupport;
+
+/**
+ * Set of predefined functions for REST interaction with elastic search API. Performs
+ * HTTP requests and JSON (de)serialization.
+ */
+final class ElasticsearchTransport {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchTable.class);
+
+  static final int DEFAULT_FETCH_SIZE = 5196;
+
+  private final ObjectMapper mapper;
+  private final RestClient restClient;
+
+  final String indexName;
+  final String typeName;
+
+  final ElasticsearchVersion version;
+
+  /**
+   * Default batch size
+   * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html">Scrolling API</a>
+   */
+  final int fetchSize;
+
+  ElasticsearchTransport(final RestClient restClient,
+                         final ObjectMapper mapper,
+                         final String indexName,
+                         final String typeName,
+                         final int fetchSize) {
+    this.mapper = Objects.requireNonNull(mapper, "mapper");
+    this.restClient = Objects.requireNonNull(restClient, "restClient");
+    this.indexName = Objects.requireNonNull(indexName, "indexName");
+    this.typeName = Objects.requireNonNull(typeName, "typeName");
+    this.fetchSize = fetchSize;
+    this.version = version(); // cache version
+  }
+
+  RestClient restClient() {
+    return this.restClient;
+  }
+
+  /**
+   * Detects current Elastic Search version by connecting to a existing instance.
+   * It is a {@code GET} request to {@code /}. Returned JSON has server information
+   * (including version).
+   *
+   * @return parsed version from ES, or {@link ElasticsearchVersion#UNKNOWN}
+   */
+  private ElasticsearchVersion version() {
+    final HttpRequest request = new HttpGet("/");
+    // version extract function
+    final Function<ObjectNode, ElasticsearchVersion> fn = node -> ElasticsearchVersion.fromString(
+        node.get("version").get("number").asText());
+    return rawHttp(ObjectNode.class)
+        .andThen(fn)
+        .apply(request);
+  }
+
+  ObjectMapper mapper() {
+    return mapper;
+  }
+
+  Function<HttpRequest, Response> rawHttp() {
+    return new HttpFunction(restClient);
+  }
+
+  <T> Function<HttpRequest, T> rawHttp(Class<T> responseType) {
+    Objects.requireNonNull(responseType, "responseType");
+    return rawHttp().andThen(new JsonParserFn<>(mapper, responseType));
+  }
+
+  /**
+   * Fetches search results given a scrollId.
+   */
+  Function<String, ElasticsearchJson.Result> scroll() {
+    return scrollId -> {
+      // fetch next scroll
+      final HttpPost request = new HttpPost(URI.create("/_search/scroll"));
+      final ObjectNode payload = mapper.createObjectNode()
+          .put("scroll", "1m")
+          .put("scroll_id", scrollId);
+
+      try {
+        final String json = mapper.writeValueAsString(payload);
+        request.setEntity(new StringEntity(json, ContentType.APPLICATION_JSON));
+        return rawHttp(ElasticsearchJson.Result.class).apply(request);
+      } catch (IOException e) {
+        String message = String.format(Locale.ROOT, "Couldn't fetch next scroll %s", scrollId);
+        throw new UncheckedIOException(message, e);
+      }
+    };
+
+  }
+
+  void closeScroll(Iterable<String> scrollIds) {
+    Objects.requireNonNull(scrollIds, "scrollIds");
+
+    // delete current scroll
+    final URI uri = URI.create("/_search/scroll");
+    // http DELETE with payload
+    final HttpEntityEnclosingRequestBase request = new HttpEntityEnclosingRequestBase() {
+      @Override public String getMethod() {
+        return HttpDelete.METHOD_NAME;
+      }
+    };
+
+    request.setURI(uri);
+    final ObjectNode payload = mapper().createObjectNode();
+    // ES2 expects json array for DELETE scroll API
+    final ArrayNode array = payload.withArray("scroll_id");
+
+    StreamSupport.stream(scrollIds.spliterator(), false)
+        .map(TextNode::new)
+        .forEach(array::add);
+
+    try {
+      final String json = mapper().writeValueAsString(payload);
+      request.setEntity(new StringEntity(json, ContentType.APPLICATION_JSON));
+      rawHttp().apply(request);
+    } catch (IOException | UncheckedIOException e) {
+      LOGGER.warn("Failed to close scroll(s): {}", scrollIds, e);
+    }
+  }
+
+  Function<ObjectNode, ElasticsearchJson.Result> search() {
+    return search(Collections.emptyMap());
+  }
+
+  /**
+   * Search request using HTTP post.
+   */
+  Function<ObjectNode, ElasticsearchJson.Result> search(final Map<String, String> httpParams) {
+    Objects.requireNonNull(httpParams, "httpParams");
+    return query -> {
+      Hook.QUERY_PLAN.run(query);
+      String path = String.format(Locale.ROOT, "/%s/%s/_search", indexName, typeName);
+      final HttpPost post;
+      try {
+        URIBuilder builder = new URIBuilder(path);
+        httpParams.forEach(builder::addParameter);
+        post = new HttpPost(builder.build());
+        final String json = mapper.writeValueAsString(query);
+        LOGGER.debug("Elasticsearch Query: {}", json);
+        post.setEntity(new StringEntity(json, ContentType.APPLICATION_JSON));
+      } catch (URISyntaxException e) {
+        throw new RuntimeException(e);
+      } catch (JsonProcessingException e) {
+        throw new UncheckedIOException(e);
+      }
+
+      return rawHttp(ElasticsearchJson.Result.class).apply(post);
+    };
+  }
+
+  /**
+   * Parses HTTP response into some class using jackson API.
+   * @param <T> result type
+   */
+  private static class JsonParserFn<T> implements Function<Response, T> {
+    private final ObjectMapper mapper;
+    private final Class<T> klass;
+
+    JsonParserFn(final ObjectMapper mapper, final Class<T> klass) {
+      this.mapper = mapper;
+      this.klass = klass;
+    }
+
+    @Override public T apply(final Response response) {
+      try (InputStream is = response.getEntity().getContent()) {
+        return mapper.readValue(is, klass);
+      } catch (IOException e) {
+        final String message = String.format(Locale.ROOT,
+            "Couldn't parse HTTP response %s into %s", response, klass);
+        throw new UncheckedIOException(message, e);
+      }
+    }
+  }
+
+  /**
+   * Basic rest operations interacting with elastic cluster.
+   */
+  private static class HttpFunction implements Function<HttpRequest, Response> {
+
+    private final RestClient restClient;
+
+    HttpFunction(final RestClient restClient) {
+      this.restClient = Objects.requireNonNull(restClient, "restClient");
+    }
+
+    @Override public Response apply(final HttpRequest request) {
+      try {
+        return applyInternal(request);
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+
+    private Response applyInternal(final HttpRequest request)
+        throws IOException  {
+
+      Objects.requireNonNull(request, "request");
+      final HttpEntity entity = request instanceof HttpEntityEnclosingRequest
+          ? ((HttpEntityEnclosingRequest) request).getEntity() : null;
+
+      final Response response = restClient.performRequest(
+          request.getRequestLine().getMethod(),
+          request.getRequestLine().getUri(),
+          Collections.emptyMap(),
+          entity);
+
+      final String payload = entity != null && entity.isRepeatable()
+          ? EntityUtils.toString(entity) : "<empty>";
+
+      if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+        final String error = EntityUtils.toString(response.getEntity());
+
+        final String message = String.format(Locale.ROOT,
+            "Error while querying Elastic (on %s/%s) status: %s\nPayload:\n%s\nError:\n%s\n",
+            response.getHost(), response.getRequestLine(),
+            response.getStatusLine(), payload, error);
+        throw new RuntimeException(message);
+      }
+
+      return response;
+    }
+  }
+}
+
+// End ElasticsearchTransport.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/614bc7b2/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/Scrolling.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/Scrolling.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/Scrolling.java
new file mode 100644
index 0000000..f947ae5
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/Scrolling.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.adapter.elasticsearch;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.AbstractSequentialIterator;
+import com.google.common.collect.Iterators;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.function.Consumer;
+
+/**
+ * <p>"Iterator" which retrieves results lazily and in batches. Uses
+ * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html">Elastic Scrolling API</a>
+ * to optimally consume large search results.
+ *
+ * <p>This class is <strong>not thread safe</strong>.
+ */
+class Scrolling {
+
+  private final ElasticsearchTransport transport;
+  private final int fetchSize;
+
+  Scrolling(ElasticsearchTransport transport) {
+    this.transport = Objects.requireNonNull(transport, "transport");
+    final int fetchSize = transport.fetchSize;
+    Preconditions.checkArgument(fetchSize > 0,
+        "invalid fetch size. Expected %s > 0", fetchSize);
+    this.fetchSize = fetchSize;
+  }
+
+  Iterator<ElasticsearchJson.SearchHit> query(ObjectNode query) {
+    Objects.requireNonNull(query, "query");
+    final long limit;
+    if (query.has("size")) {
+      limit = query.get("size").asLong();
+      if (fetchSize > limit) {
+        // don't use scrolling when batch size is greater than limit
+        return transport.search().apply(query).searchHits().hits().iterator();
+      }
+    } else {
+      limit = Long.MAX_VALUE;
+    }
+
+    query.put("size", fetchSize);
+    final ElasticsearchJson.Result first = transport
+        .search(Collections.singletonMap("scroll", "1m")).apply(query);
+
+    AutoClosingIterator iterator = new AutoClosingIterator(
+        new SequentialIterator(first, transport, limit),
+        scrollId -> transport.closeScroll(Collections.singleton(scrollId)));
+
+    Iterator<ElasticsearchJson.SearchHit> result = flatten(iterator);
+    // apply limit
+    if (limit != Long.MAX_VALUE) {
+      result = Iterators.limit(result, (int) limit);
+    }
+
+    return result;
+  }
+
+  /**
+   * Combines lazily multiple {@link ElasticsearchJson.Result} into a single iterator of
+   * {@link ElasticsearchJson.SearchHit}.
+   */
+  private static Iterator<ElasticsearchJson.SearchHit> flatten(
+      Iterator<ElasticsearchJson.Result> results) {
+    final Iterator<Iterator<ElasticsearchJson.SearchHit>> inputs = Iterators.transform(results,
+        input -> input.searchHits().hits().iterator());
+    return Iterators.concat(inputs);
+  }
+
+  /**
+   * Observes when existing iterator has ended and clears context (scroll) if any.
+   */
+  private static class AutoClosingIterator implements Iterator<ElasticsearchJson.Result>,
+      AutoCloseable {
+    private final Iterator<ElasticsearchJson.Result> delegate;
+    private final Consumer<String> closer;
+
+    /**
+     * Was {@link #closer} consumer already called ?
+     */
+    private boolean closed;
+
+    /**
+     * Keeps last value of {@code scrollId} in memory so scroll can be released upon termination
+     */
+    private String scrollId;
+
+    private AutoClosingIterator(
+        final Iterator<ElasticsearchJson.Result> delegate,
+        final Consumer<String> closer) {
+      this.delegate = delegate;
+      this.closer = closer;
+    }
+
+    @Override public void close() {
+      if (!closed && scrollId != null) {
+        // close once (if scrollId is present)
+        closer.accept(scrollId);
+      }
+      closed = true;
+    }
+
+    @Override public boolean hasNext() {
+      final boolean hasNext = delegate.hasNext();
+      if (!hasNext) {
+        close();
+      }
+      return hasNext;
+    }
+
+    @Override public ElasticsearchJson.Result next() {
+      ElasticsearchJson.Result next = delegate.next();
+      next.scrollId().ifPresent(id -> scrollId = id);
+      return next;
+    }
+  }
+
+  /**
+   * Iterator which consumes current {@code scrollId} until full search result is fetched
+   * or {@code limit} is reached.
+   */
+  private static class SequentialIterator
+      extends AbstractSequentialIterator<ElasticsearchJson.Result> {
+
+    private final ElasticsearchTransport transport;
+    private final long limit;
+    private long count;
+
+    private SequentialIterator(final ElasticsearchJson.Result first,
+        final ElasticsearchTransport transport, final long limit) {
+      super(first);
+      this.transport = transport;
+      Preconditions.checkArgument(limit >= 0,
+          "limit: %s >= 0", limit);
+      this.limit = limit;
+    }
+
+    @Override protected ElasticsearchJson.Result computeNext(
+        final ElasticsearchJson.Result previous) {
+      final int hits = previous.searchHits().hits().size();
+      if (hits == 0 || count >= limit) {
+        // stop (re-)requesting when limit is reached or no more results
+        return null;
+      }
+
+      count += hits;
+      final String scrollId = previous.scrollId()
+          .orElseThrow(() -> new IllegalStateException("scrollId has to be present"));
+
+      return transport.scroll().apply(scrollId);
+    }
+  }
+}
+
+// End Scrolling.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/614bc7b2/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
index 4895df8..73cf6bf 100644
--- a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
+++ b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
@@ -37,12 +37,15 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.function.Consumer;
 
 /**
  * Set of tests for ES adapter. Uses real instance via {@link EmbeddedElasticsearchPolicy}. Document
@@ -55,6 +58,7 @@ public class ElasticSearchAdapterTest {
 
   /** Default index/type name */
   private static final String ZIPS = "zips";
+  private static final int ZIPS_SIZE = 149;
 
   /**
    * Used to create {@code zips} index and insert zip data in bulk.
@@ -177,11 +181,6 @@ public class ElasticSearchAdapterTest {
         .query("select * from elastic.zips where _MAP['CITY'] = 'BROOKLYN'")
         .returnsCount(0);
 
-    // limit works
-    CalciteAssert.that()
-        .with(newConnectionFactory())
-        .query("select * from elastic.zips limit 42")
-        .returnsCount(42);
 
     // limit 0
     CalciteAssert.that()
@@ -196,9 +195,31 @@ public class ElasticSearchAdapterTest {
         + "    ElasticsearchProject(city=[CAST(ITEM($0, 'city')):VARCHAR(20) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\"], longitude=[CAST(ITEM(ITEM($0, 'loc'), 0)):FLOAT], latitude=[CAST(ITEM(ITEM($0, 'loc'), 1)):FLOAT], pop=[CAST(ITEM($0, 'pop')):INTEGER], state=[CAST(ITEM($0, 'state')):VARCHAR(2) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\"], id=[CAST(ITEM($0, 'id')):VARCHAR(5) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\"])\n"
         + "      ElasticsearchTableScan(table=[[elastic, zips]])";
 
+    Consumer<ResultSet> checker = rset -> {
+      try {
+        final List<String> states = new ArrayList<>();
+        while (rset.next()) {
+          states.add(rset.getString("state"));
+        }
+        for (int i = 0; i < states.size() - 1; i++) {
+          String current = states.get(i);
+          String next = states.get(i + 1);
+          if (current.compareTo(next) > 0) {
+            final String message = String.format(Locale.ROOT,
+                "Not sorted: %s (index:%d) > %s (index:%d) count: %d",
+                current, i, next, i + 1, states.size());
+            throw new AssertionError(message);
+          }
+        }
+      } catch (SQLException e) {
+        throw new RuntimeException(e);
+      }
+    };
+
     calciteAssert()
         .query("select * from zips order by state")
-        .returnsCount(10)
+        .returnsCount(ZIPS_SIZE)
+        .returns(checker)
         .explainContains(explain);
   }
 
@@ -305,7 +326,8 @@ public class ElasticSearchAdapterTest {
                     + "pop:{script: 'params._source.pop'}, "
                     + "state:{script: 'params._source.state'}, "
                     + "id:{script: 'params._source.id'}}",
-                "sort: [ {state: 'asc'}, {pop: 'asc'}]"))
+                "sort: [ {state: 'asc'}, {pop: 'asc'}]",
+                String.format(Locale.ROOT, "size:%s", ElasticsearchTransport.DEFAULT_FETCH_SIZE)))
         .explainContains(explain);
   }
 
@@ -323,14 +345,15 @@ public class ElasticSearchAdapterTest {
 
   @Test public void testInPlan() {
     final String[] searches = {
-        "'query' : {'constant_score':{filter:{bool:{should:"
+        "query: {'constant_score':{filter:{bool:{should:"
             + "[{term:{pop:96074}},{term:{pop:99568}}]}}}}",
-        "'script_fields': {longitude:{script:'params._source.loc[0]'}, "
+        "script_fields: {longitude:{script:'params._source.loc[0]'}, "
             +  "latitude:{script:'params._source.loc[1]'}, "
             +  "city:{script: 'params._source.city'}, "
             +  "pop:{script: 'params._source.pop'}, "
             +  "state:{script: 'params._source.state'}, "
-            +  "id:{script: 'params._source.id'}}"
+            +  "id:{script: 'params._source.id'}}",
+        String.format(Locale.ROOT, "size:%d", ElasticsearchTransport.DEFAULT_FETCH_SIZE)
     };
 
     calciteAssert()
@@ -344,7 +367,7 @@ public class ElasticSearchAdapterTest {
   @Test public void testZips() {
     calciteAssert()
         .query("select state, city from zips")
-        .returnsCount(10);
+        .returnsCount(ZIPS_SIZE);
   }
 
   @Test public void testProject() {
@@ -362,7 +385,8 @@ public class ElasticSearchAdapterTest {
                     + "{zero:{script:'0'},"
                     + "state:{script:'params._source.state'},"
                     + "city:{script:'params._source.city'}}",
-                "sort:[{state:'asc'},{city:'asc'}]"));
+                "sort:[{state:'asc'},{city:'asc'}]",
+                String.format(Locale.ROOT, "size:%d", ElasticsearchTransport.DEFAULT_FETCH_SIZE)));
   }
 
   @Test public void testFilter() {

http://git-wip-us.apache.org/repos/asf/calcite/blob/614bc7b2/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/EmbeddedElasticsearchPolicy.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/EmbeddedElasticsearchPolicy.java b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/EmbeddedElasticsearchPolicy.java
index bac2e6f..8089057 100644
--- a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/EmbeddedElasticsearchPolicy.java
+++ b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/EmbeddedElasticsearchPolicy.java
@@ -197,14 +197,18 @@ class EmbeddedElasticsearchPolicy extends ExternalResource {
     if (client != null) {
       return client;
     }
-    TransportAddress address = httpAddress();
-    RestClient client = RestClient.builder(new HttpHost(address.getAddress(), address.getPort()))
-            .build();
+
+    final RestClient client = RestClient.builder(httpHost()).build();
     closer.add(client);
     this.client = client;
     return client;
   }
 
+  HttpHost httpHost() {
+    final TransportAddress address = httpAddress();
+    return new HttpHost(address.getAddress(), address.getPort());
+  }
+
   /**
    * HTTP address for rest clients (can be ES native or any other).
    * @return http address to connect to

http://git-wip-us.apache.org/repos/asf/calcite/blob/614bc7b2/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ScrollingTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ScrollingTest.java b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ScrollingTest.java
new file mode 100644
index 0000000..5beec12
--- /dev/null
+++ b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ScrollingTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.test.CalciteAssert;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.elasticsearch.client.Response;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.IntStream;
+
+/**
+ * Tests usage of scrolling API like correct results and resource cleanup
+ * (delete scroll after scan).
+ */
+public class ScrollingTest {
+
+  @ClassRule
+  public static final EmbeddedElasticsearchPolicy NODE = EmbeddedElasticsearchPolicy.create();
+
+  private static final String NAME = "scroll";
+  private static final int SIZE = 10;
+
+  @BeforeClass
+  public static void setupInstance() throws Exception {
+    NODE.createIndex(NAME, Collections.singletonMap("value", "long"));
+    final List<ObjectNode> docs = new ArrayList<>();
+    for (int i = 0; i < SIZE; i++) {
+      String json = String.format(Locale.ROOT, "{\"value\": %d}", i);
+      docs.add((ObjectNode) NODE.mapper().readTree(json));
+    }
+    NODE.insertBulk(NAME, docs);
+  }
+
+  private CalciteAssert.ConnectionFactory newConnectionFactory(int fetchSize) {
+    return new CalciteAssert.ConnectionFactory() {
+      @Override public Connection createConnection() throws SQLException {
+        final Connection connection = DriverManager.getConnection("jdbc:calcite:");
+        final SchemaPlus root = connection.unwrap(CalciteConnection.class).getRootSchema();
+        ElasticsearchSchema schema = new ElasticsearchSchema(NODE.restClient(), NODE.mapper(),
+            NAME, null, fetchSize);
+        root.add("elastic", schema);
+        return connection;
+      }
+    };
+  }
+
+  @Test
+  public void scrolling() throws Exception {
+    final String[] expected = IntStream.range(0, SIZE).mapToObj(i -> "V=" + i)
+        .toArray(String[]::new);
+    final String query = String.format(Locale.ROOT, "select _MAP['value'] as v from "
+        + "\"elastic\".\"%s\"", NAME);
+
+    for (int fetchSize: Arrays.asList(1, 2, 3, SIZE / 2, SIZE - 1, SIZE, SIZE + 1, 2 * SIZE)) {
+      CalciteAssert.that()
+          .with(newConnectionFactory(fetchSize))
+          .query(query)
+          .returnsUnordered(expected);
+      assertNoActiveScrolls();
+    }
+  }
+
+  /**
+   * Ensures there are no pending scroll contexts in elastic search cluster.
+   * Queries {@code /_nodes/stats/indices/search} endpoint.
+   * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-stats.html">Indices Stats</a>
+   */
+  private void assertNoActiveScrolls() throws IOException  {
+    // get node stats
+    final Response response = NODE.restClient()
+        .performRequest("GET", "/_nodes/stats/indices/search");
+
+    try (InputStream is = response.getEntity().getContent()) {
+      final ObjectNode node = NODE.mapper().readValue(is, ObjectNode.class);
+      final String path = "/indices/search/scroll_current";
+      final JsonNode scrollCurrent = node.with("nodes").elements().next().at(path);
+      if (scrollCurrent.isMissingNode()) {
+        throw new IllegalStateException("Couldn't find node at " + path);
+      }
+
+      if (scrollCurrent.asInt() != 0) {
+        final String message = String.format(Locale.ROOT, "Expected no active scrolls "
+            + "but got %d. Current index stats %s", scrollCurrent.asInt(), node);
+        throw new AssertionError(message);
+      }
+    }
+  }
+
+
+}
+
+// End ScrollingTest.java