You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/01/04 12:53:44 UTC

[1/2] beam git commit: [BEAM-425] Add ElasticsearchIO

Repository: beam
Updated Branches:
  refs/heads/master d1d85dfc7 -> 926ec8e80


[BEAM-425] Add ElasticsearchIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6412389a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6412389a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6412389a

Branch: refs/heads/master
Commit: 6412389a02ad0dbdac5cfe5425a8a56462071477
Parents: d1d85df
Author: Etienne Chauchot and Jean-Baptiste Onofr� <ec...@apache.org>
Authored: Thu Oct 20 11:45:47 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Jan 4 13:39:07 2017 +0100

----------------------------------------------------------------------
 sdks/java/io/elasticsearch/pom.xml              | 175 ++++
 .../sdk/io/elasticsearch/ElasticsearchIO.java   | 819 +++++++++++++++++++
 .../beam/sdk/io/elasticsearch/package-info.java |  20 +
 .../elasticsearch/ElasticSearchIOTestUtils.java | 129 +++
 .../io/elasticsearch/ElasticsearchIOTest.java   | 358 ++++++++
 sdks/java/io/pom.xml                            |   1 +
 6 files changed, 1502 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6412389a/sdks/java/io/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/pom.xml b/sdks/java/io/elasticsearch/pom.xml
new file mode 100644
index 0000000..94e8c6c
--- /dev/null
+++ b/sdks/java/io/elasticsearch/pom.xml
@@ -0,0 +1,175 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-parent</artifactId>
+    <version>0.5.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-io-elasticsearch</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: Elasticsearch</name>
+  <description>IO to read and write on Elasticsearch.</description>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      <version>2.6.2</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.elasticsearch.client</groupId>
+      <artifactId>rest</artifactId>
+      <version>5.0.0</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpcore-nio</artifactId>
+      <version>4.4.5</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpcore</artifactId>
+      <version>4.4.5</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpasyncclient</artifactId>
+      <version>4.1.2</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>4.5.2</version>
+    </dependency>
+
+    <!-- compile dependencies -->
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- test -->
+    <dependency>
+      <groupId>org.elasticsearch</groupId>
+      <artifactId>elasticsearch</artifactId>
+      <version>2.4.1</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-core</artifactId>
+      <version>1.3</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>1.3.2</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/6412389a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
new file mode 100644
index 0000000..5073834
--- /dev/null
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -0,0 +1,819 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.elasticsearch;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.nio.entity.NStringEntity;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+
+/**
+ * Transforms for reading and writing data from/to Elasticsearch.
+ *
+ * <h3>Reading from Elasticsearch</h3>
+ *
+ * <p>{@link ElasticsearchIO#read ElasticsearchIO.read()} returns a bounded
+ * {@link PCollection PCollection&lt;String&gt;} representing JSON documents.
+ *
+ * <p>To configure the {@link ElasticsearchIO#read}, you have to provide a connection configuration
+ * containing the HTTP address of the instances, an index name and a type. The following example
+ * illustrates options for configuring the source:
+ *
+ * <pre>{@code
+ *
+ * pipeline.apply(ElasticsearchIO.read().withConnectionConfiguration(
+ *    ElasticsearchIO.ConnectionConfiguration.create("http://host:9200", "my-index", "my-type")
+ * )
+ *
+ * }</pre>
+ *
+ * <p>The connection configuration also accepts optional configuration: {@code withUsername()} and
+ * {@code withPassword()}.
+ *
+ * <p>You can also specify a query on the {@code read()} using {@code withQuery()}.
+ *
+ * <h3>Writing to Elasticsearch</h3>
+ *
+ * <p>To write documents to Elasticsearch, use
+ * {@link ElasticsearchIO#write ElasticsearchIO.write()}, which writes JSON documents from a
+ * {@link PCollection PCollection&lt;String&gt;} (which can be bounded or unbounded).
+ *
+ * <p>To configure {@link ElasticsearchIO#write ElasticsearchIO.write()}, similar to the read, you
+ * have to provide a connection configuration. For instance:
+ *
+ * <pre>{@code
+ *
+ *  pipeline
+ *    .apply(...)
+ *    .apply(ElasticsearchIO.write().withConnectionConfiguration(
+ *       ElasticsearchIO.ConnectionConfiguration.create("http://host:9200", "my-index", "my-type")
+ *    )
+ *
+ * }</pre>
+ *
+ * <p>Optionally, you can provide {@code withBatchSize()} and {@code withBatchSizeBytes()}
+ * to specify the size of the write batch in number of documents or in bytes.
+ */
+public class ElasticsearchIO {
+
+  public static Read read() {
+    // default scrollKeepalive = 5m as a majorant for un-predictable time between 2 start/read calls
+    // default batchSize to 100 as recommended by ES dev team as a safe value when dealing
+    // with big documents and still a good compromise for performances
+    return new AutoValue_ElasticsearchIO_Read.Builder()
+        .setScrollKeepalive("5m")
+        .setBatchSize(100L)
+        .build();
+  }
+
+  public static Write write() {
+    return new AutoValue_ElasticsearchIO_Write.Builder()
+        // advised default starting batch size in ES docs
+        .setMaxBatchSize(1000L)
+        // advised default starting batch size in ES docs
+        .setMaxBatchSizeBytes(5L * 1024L * 1024L)
+        .build();
+  }
+
+  private ElasticsearchIO() {}
+
+  private static JsonObject parseResponse(Response response) throws IOException {
+    InputStream content = response.getEntity().getContent();
+    InputStreamReader inputStreamReader = new InputStreamReader(content, "UTF-8");
+    JsonObject jsonObject = new Gson().fromJson(inputStreamReader, JsonObject.class);
+    return jsonObject;
+  }
+
+  /** A POJO describing a connection configuration to Elasticsearch. */
+  @AutoValue
+  public abstract static class ConnectionConfiguration implements Serializable {
+
+    abstract List<String> getAddresses();
+
+    @Nullable
+    abstract String getUsername();
+
+    @Nullable
+    abstract String getPassword();
+
+    abstract String getIndex();
+
+    abstract String getType();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setAddresses(List<String> addresses);
+
+      abstract Builder setUsername(String username);
+
+      abstract Builder setPassword(String password);
+
+      abstract Builder setIndex(String index);
+
+      abstract Builder setType(String type);
+
+      abstract ConnectionConfiguration build();
+    }
+
+    /**
+     * Creates a new Elasticsearch connection configuration.
+     *
+     * @param addresses list of addresses of Elasticsearch nodes
+     * @param index the index toward which the requests will be issued
+     * @param type the document type toward which the requests will be issued
+     * @return the connection configuration object
+     */
+    public static ConnectionConfiguration create(String[] addresses, String index, String type) {
+      checkArgument(
+          addresses != null,
+          "ConnectionConfiguration.create(addresses, index, type) called with null address");
+      checkArgument(
+          addresses.length != 0,
+          "ConnectionConfiguration.create(addresses, "
+              + "index, type) called with empty addresses");
+      checkArgument(
+          index != null,
+          "ConnectionConfiguration.create(addresses, index, type) called with null index");
+      checkArgument(
+          type != null,
+          "ConnectionConfiguration.create(addresses, index, type) called with null type");
+      return new AutoValue_ElasticsearchIO_ConnectionConfiguration.Builder()
+          .setAddresses(Arrays.asList(addresses))
+          .setIndex(index)
+          .setType(type)
+          .build();
+    }
+
+    /**
+     * If Elasticsearch authentication is enabled, provide the username.
+     *
+     * @param username the username used to authenticate to Elasticsearch
+     * @return the {@link ConnectionConfiguration} object with username set
+     */
+    public ConnectionConfiguration withUsername(String username) {
+      checkArgument(
+          username != null,
+          "ConnectionConfiguration.create().withUsername(username) called with null username");
+      checkArgument(
+          !username.isEmpty(),
+          "ConnectionConfiguration.create().withUsername(username) called with empty username");
+      return builder().setUsername(username).build();
+    }
+
+    /**
+     * If Elasticsearch authentication is enabled, provide the password.
+     *
+     * @param password the password used to authenticate to Elasticsearch
+     * @return the {@link ConnectionConfiguration} object with password set
+     */
+    public ConnectionConfiguration withPassword(String password) {
+      checkArgument(
+          password != null,
+          "ConnectionConfiguration.create().withPassword(password) called with null password");
+      checkArgument(
+          !password.isEmpty(),
+          "ConnectionConfiguration.create().withPassword(password) called with empty password");
+      return builder().setPassword(password).build();
+    }
+
+    private void populateDisplayData(DisplayData.Builder builder) {
+      builder.add(DisplayData.item("address", getAddresses().toString()));
+      builder.add(DisplayData.item("index", getIndex()));
+      builder.add(DisplayData.item("type", getType()));
+      builder.addIfNotNull(DisplayData.item("username", getUsername()));
+    }
+
+    private RestClient createClient() throws MalformedURLException {
+      HttpHost[] hosts = new HttpHost[getAddresses().size()];
+      int i = 0;
+      for (String address : getAddresses()) {
+        URL url = new URL(address);
+        hosts[i] = new HttpHost(url.getHost(), url.getPort(), url.getProtocol());
+        i++;
+      }
+      RestClientBuilder restClientBuilder = RestClient.builder(hosts);
+      if (getUsername() != null) {
+        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+        credentialsProvider.setCredentials(
+            AuthScope.ANY, new UsernamePasswordCredentials(getUsername(), getPassword()));
+        restClientBuilder.setHttpClientConfigCallback(
+            new RestClientBuilder.HttpClientConfigCallback() {
+              public HttpAsyncClientBuilder customizeHttpClient(
+                  HttpAsyncClientBuilder httpAsyncClientBuilder) {
+                return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+              }
+            });
+      }
+      return restClientBuilder.build();
+    }
+  }
+
+  /** A {@link PTransform} reading data from Elasticsearch. */
+  @AutoValue
+  public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
+
+    private static final long MAX_BATCH_SIZE = 10000L;
+
+    @Nullable
+    abstract ConnectionConfiguration getConnectionConfiguration();
+
+    @Nullable
+    abstract String getQuery();
+
+    abstract String getScrollKeepalive();
+
+    abstract long getBatchSize();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);
+
+      abstract Builder setQuery(String query);
+
+      abstract Builder setScrollKeepalive(String scrollKeepalive);
+
+      abstract Builder setBatchSize(long batchSize);
+
+      abstract Read build();
+    }
+
+    /**
+     * Provide the Elasticsearch connection configuration object.
+     *
+     * @param connectionConfiguration the Elasticsearch {@link ConnectionConfiguration} object
+     * @return the {@link Read} with connection configuration set
+     */
+    public Read withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
+      checkArgument(
+          connectionConfiguration != null,
+          "ElasticsearchIO.read()"
+              + ".withConnectionConfiguration(configuration) called with null configuration");
+      return builder().setConnectionConfiguration(connectionConfiguration).build();
+    }
+
+    /**
+     * Provide a query used while reading from Elasticsearch.
+     *
+     * @param query the query. See <a
+     *     href="https://www.elastic.co/guide/en/elasticsearch/reference/2.4/query-dsl.html">Query
+     *     DSL</a>
+     * @return the {@link Read} object with query set
+     */
+    public Read withQuery(String query) {
+      checkArgument(
+          !Strings.isNullOrEmpty(query),
+          "ElasticsearchIO.read().withQuery(query) called" + " with null or empty query");
+      return builder().setQuery(query).build();
+    }
+
+    /**
+     * Provide a scroll keepalive. See <a
+     * href="https://www.elastic.co/guide/en/elasticsearch/reference/2.4/search-request-scroll.html">scroll
+     * API</a> Default is "5m". Change this only if you get "No search context found" errors.
+     *
+     * @param scrollKeepalive keepalive duration ex "5m" from 5 minutes
+     * @return the {@link Read} with scroll keepalive set
+     */
+    public Read withScrollKeepalive(String scrollKeepalive) {
+      checkArgument(
+          scrollKeepalive != null && !scrollKeepalive.equals("0m"),
+          "ElasticsearchIO.read().withScrollKeepalive(keepalive) called"
+              + " with null or \"0m\" keepalive");
+      return builder().setScrollKeepalive(scrollKeepalive).build();
+    }
+
+    /**
+     * Provide a size for the scroll read. See <a
+     * href="https://www.elastic.co/guide/en/elasticsearch/reference/2.4/search-request-scroll.html">
+     * scroll API</a> Default is 100. Maximum is 10 000. If documents are small, increasing batch
+     * size might improve read performance. If documents are big, you might need to decrease
+     * batchSize
+     *
+     * @param batchSize number of documents read in each scroll read
+     * @return the {@link Read} with batch size set
+     */
+    public Read withBatchSize(long batchSize) {
+      checkArgument(
+          batchSize > 0,
+          "ElasticsearchIO.read().withBatchSize(batchSize) called with a negative "
+              + "or equal to 0 value: %s",
+          batchSize);
+      checkArgument(
+          batchSize <= MAX_BATCH_SIZE,
+          "ElasticsearchIO.read().withBatchSize(batchSize) "
+              + "called with a too large value (over %s): %s",
+          MAX_BATCH_SIZE,
+          batchSize);
+      return builder().setBatchSize(batchSize).build();
+    }
+
+    @Override
+    public PCollection<String> expand(PBegin input) {
+      return input.apply(
+          org.apache.beam.sdk.io.Read.from(new BoundedElasticsearchSource(this, null)));
+    }
+
+    @Override
+    public void validate(PBegin input) {
+      checkState(
+          getConnectionConfiguration() != null,
+          "ElasticsearchIO.read() requires a connection configuration"
+              + " to be set via withConnectionConfiguration(configuration)");
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.addIfNotNull(DisplayData.item("query", getQuery()));
+      getConnectionConfiguration().populateDisplayData(builder);
+    }
+  }
+
+  /** A {@link BoundedSource} reading from Elasticsearch. */
+  @VisibleForTesting
+  static class BoundedElasticsearchSource extends BoundedSource<String> {
+
+    private final ElasticsearchIO.Read spec;
+    // shardPreference is the shard number where the source will read the documents
+    @Nullable private final String shardPreference;
+
+    BoundedElasticsearchSource(Read spec, @Nullable String shardPreference) {
+      this.spec = spec;
+      this.shardPreference = shardPreference;
+    }
+
+    @Override
+    public List<? extends BoundedSource<String>> splitIntoBundles(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+      List<BoundedElasticsearchSource> sources = new ArrayList<>();
+
+      // 1. We split per shard :
+      // unfortunately, Elasticsearch 2. x doesn 't provide a way to do parallel reads on a single
+      // shard.So we do not use desiredBundleSize because we cannot split shards.
+      // With the slice API in ES 5.0 we will be able to use desiredBundleSize.
+      // Basically we will just ask the slice API to return data
+      // in nbBundles = estimatedSize / desiredBundleSize chuncks.
+      // So each beam source will read around desiredBundleSize volume of data.
+
+      // 2. Primary and replica shards have the same shard_id, we filter primary
+      // to have one source for each shard_id. Even if we specify preference=shards:2,
+      // ES load balances (round robin) the request between primary shard 2 and replica shard 2.
+      // But, as each shard (replica or primary) is responsible for only one part of the data,
+      // there will be no duplicate.
+
+      JsonObject statsJson = getStats(true);
+      JsonObject shardsJson =
+          statsJson
+              .getAsJsonObject("indices")
+              .getAsJsonObject(spec.getConnectionConfiguration().getIndex())
+              .getAsJsonObject("shards");
+      Set<Map.Entry<String, JsonElement>> shards = shardsJson.entrySet();
+      for (Map.Entry<String, JsonElement> shardJson : shards) {
+        String shardId = shardJson.getKey();
+        JsonArray value = (JsonArray) shardJson.getValue();
+        boolean isPrimaryShard =
+            value
+                .get(0)
+                .getAsJsonObject()
+                .getAsJsonObject("routing")
+                .getAsJsonPrimitive("primary")
+                .getAsBoolean();
+        if (isPrimaryShard) {
+          sources.add(new BoundedElasticsearchSource(spec, shardId));
+        }
+      }
+      checkArgument(!sources.isEmpty(), "No primary shard found");
+      return sources;
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
+      // we use indices stats API to estimate size and list the shards
+      // (https://www.elastic.co/guide/en/elasticsearch/reference/2.4/indices-stats.html)
+      // as Elasticsearch 2.x doesn't not support any way to do parallel read inside a shard
+      // the estimated size bytes is not really used in the split into bundles.
+      // However, we implement this method anyway as the runners can use it.
+      // NB: Elasticsearch 5.x now provides the slice API.
+      // (https://www.elastic.co/guide/en/elasticsearch/reference/5.0/search-request-scroll.html
+      // #sliced-scroll)
+      JsonObject statsJson = getStats(false);
+      JsonObject indexStats =
+          statsJson
+              .getAsJsonObject("indices")
+              .getAsJsonObject(spec.getConnectionConfiguration().getIndex())
+              .getAsJsonObject("primaries");
+      JsonObject store = indexStats.getAsJsonObject("store");
+      return store.getAsJsonPrimitive("size_in_bytes").getAsLong();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      spec.populateDisplayData(builder);
+      builder.addIfNotNull(DisplayData.item("shard", shardPreference));
+    }
+
+    @Override
+    public BoundedReader<String> createReader(PipelineOptions options) throws IOException {
+      return new BoundedElasticsearchReader(this);
+    }
+
+    @Override
+    public void validate() {
+      spec.validate(null);
+    }
+
+    @Override
+    public Coder<String> getDefaultOutputCoder() {
+      return StringUtf8Coder.of();
+    }
+
+    private JsonObject getStats(boolean shardLevel) throws IOException {
+      HashMap<String, String> params = new HashMap<>();
+      if (shardLevel) {
+        params.put("level", "shards");
+      }
+      String endpoint = String.format("/%s/_stats", spec.getConnectionConfiguration().getIndex());
+      try (RestClient restClient = spec.getConnectionConfiguration().createClient()) {
+        return parseResponse(
+            restClient.performRequest("GET", endpoint, params, new BasicHeader("", "")));
+      }
+    }
+  }
+
+  private static class BoundedElasticsearchReader extends BoundedSource.BoundedReader<String> {
+
+    private final BoundedElasticsearchSource source;
+
+    private RestClient restClient;
+    private String current;
+    private String scrollId;
+    private ListIterator<String> batchIterator;
+
+    private BoundedElasticsearchReader(BoundedElasticsearchSource source) {
+      this.source = source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      restClient = source.spec.getConnectionConfiguration().createClient();
+
+      String query = source.spec.getQuery();
+      if (query == null) {
+        query = "{ \"query\": { \"match_all\": {} } }";
+      }
+
+      Response response;
+      String endPoint =
+          String.format(
+              "/%s/%s/_search",
+              source.spec.getConnectionConfiguration().getIndex(),
+              source.spec.getConnectionConfiguration().getType());
+      Map<String, String> params = new HashMap<>();
+      params.put("scroll", source.spec.getScrollKeepalive());
+      params.put("size", String.valueOf(source.spec.getBatchSize()));
+      if (source.shardPreference != null) {
+        params.put("preference", "_shards:" + source.shardPreference);
+      }
+      HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON);
+      response =
+          restClient.performRequest("GET", endPoint, params, queryEntity, new BasicHeader("", ""));
+      JsonObject searchResult = parseResponse(response);
+      updateScrollId(searchResult);
+      return readNextBatchAndReturnFirstDocument(searchResult);
+    }
+
+    private void updateScrollId(JsonObject searchResult) {
+      scrollId = searchResult.getAsJsonPrimitive("_scroll_id").getAsString();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (batchIterator.hasNext()) {
+        current = batchIterator.next();
+        return true;
+      } else {
+        String requestBody =
+            String.format(
+                "{\"scroll\" : \"%s\",\"scroll_id\" : \"%s\"}",
+                source.spec.getScrollKeepalive(), scrollId);
+        HttpEntity scrollEntity = new NStringEntity(requestBody, ContentType.APPLICATION_JSON);
+        Response response =
+            restClient.performRequest(
+                "GET",
+                "/_search/scroll",
+                Collections.<String, String>emptyMap(),
+                scrollEntity,
+                new BasicHeader("", ""));
+        JsonObject searchResult = parseResponse(response);
+        updateScrollId(searchResult);
+        return readNextBatchAndReturnFirstDocument(searchResult);
+      }
+    }
+
+    private boolean readNextBatchAndReturnFirstDocument(JsonObject searchResult) {
+      //stop if no more data
+      JsonArray hits = searchResult.getAsJsonObject("hits").getAsJsonArray("hits");
+      if (hits.size() == 0) {
+        current = null;
+        batchIterator = null;
+        return false;
+      }
+      // list behind iterator is empty
+      List<String> batch = new ArrayList<>();
+      for (JsonElement hit : hits) {
+        String document = hit.getAsJsonObject().getAsJsonObject("_source").toString();
+        batch.add(document);
+      }
+      batchIterator = batch.listIterator();
+      current = batchIterator.next();
+      return true;
+    }
+
+    @Override
+    public String getCurrent() throws NoSuchElementException {
+      if (current == null) {
+        throw new NoSuchElementException();
+      }
+      return current;
+    }
+
+    @Override
+    public void close() throws IOException {
+      // remove the scroll
+      String requestBody = String.format("{\"scroll_id\" : [\"%s\"]}", scrollId);
+      HttpEntity entity = new NStringEntity(requestBody, ContentType.APPLICATION_JSON);
+      try {
+        restClient.performRequest(
+            "DELETE",
+            "/_search/scroll",
+            Collections.<String, String>emptyMap(),
+            entity,
+            new BasicHeader("", ""));
+      } finally {
+        if (restClient != null) {
+          restClient.close();
+        }
+      }
+    }
+
+    @Override
+    public BoundedSource<String> getCurrentSource() {
+      return source;
+    }
+  }
+
+  /** A {@link PTransform} writing data to Elasticsearch. */
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<String>, PDone> {
+
+    @Nullable
+    abstract ConnectionConfiguration getConnectionConfiguration();
+
+    abstract long getMaxBatchSize();
+
+    abstract long getMaxBatchSizeBytes();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);
+
+      abstract Builder setMaxBatchSize(long maxBatchSize);
+
+      abstract Builder setMaxBatchSizeBytes(long maxBatchSizeBytes);
+
+      abstract Write build();
+    }
+
+    /**
+     * Provide the Elasticsearch connection configuration object.
+     *
+     * @param connectionConfiguration the Elasticsearch {@link ConnectionConfiguration} object
+     * @return the {@link Write} with connection configuration set
+     */
+    public Write withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
+      checkArgument(
+          connectionConfiguration != null,
+          "ElasticsearchIO.write()"
+              + ".withConnectionConfiguration(configuration) called with null configuration");
+      return builder().setConnectionConfiguration(connectionConfiguration).build();
+    }
+
+    /**
+     * Provide a maximum size in number of documents for the batch see bulk API
+     * (https://www.elastic.co/guide/en/elasticsearch/reference/2.4/docs-bulk.html). Default is 1000
+     * docs (like Elasticsearch bulk size advice). See
+     * https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html Depending on the
+     * execution engine, size of bundles may vary, this sets the maximum size. Change this if you
+     * need to have smaller ElasticSearch bulks.
+     *
+     * @param batchSize maximum batch size in number of documents
+     * @return the {@link Write} with connection batch size set
+     */
+    public Write withMaxBatchSize(long batchSize) {
+      checkArgument(
+          batchSize > 0,
+          "ElasticsearchIO.write()"
+              + ".withMaxBatchSize(batchSize) called with incorrect <= 0 value");
+      return builder().setMaxBatchSize(batchSize).build();
+    }
+
+    /**
+     * Provide a maximum size in bytes for the batch see bulk API
+     * (https://www.elastic.co/guide/en/elasticsearch/reference/2.4/docs-bulk.html). Default is 5MB
+     * (like Elasticsearch bulk size advice). See
+     * https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html Depending on the
+     * execution engine, size of bundles may vary, this sets the maximum size. Change this if you
+     * need to have smaller ElasticSearch bulks.
+     *
+     * @param batchSizeBytes maximum batch size in bytes
+     * @return the {@link Write} with connection batch size in bytes set
+     */
+    public Write withMaxBatchSizeBytes(long batchSizeBytes) {
+      checkArgument(
+          batchSizeBytes > 0,
+          "ElasticsearchIO.write()"
+              + ".withMaxBatchSizeBytes(batchSizeBytes) called with incorrect <= 0 value");
+      return builder().setMaxBatchSizeBytes(batchSizeBytes).build();
+    }
+
+    @Override
+    public void validate(PCollection<String> input) {
+      checkState(
+          getConnectionConfiguration() != null,
+          "ElasticsearchIO.write() requires a connection configuration"
+              + " to be set via withConnectionConfiguration(configuration)");
+    }
+
+    @Override
+    public PDone expand(PCollection<String> input) {
+      input.apply(ParDo.of(new WriteFn(this)));
+      return PDone.in(input.getPipeline());
+    }
+
+    @VisibleForTesting
+    static class WriteFn extends DoFn<String, Void> {
+
+      private final Write spec;
+
+      private transient RestClient restClient;
+      private ArrayList<String> batch;
+      private long currentBatchSizeBytes;
+
+      WriteFn(Write spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void createClient() throws Exception {
+        restClient = spec.getConnectionConfiguration().createClient();
+      }
+
+      @StartBundle
+      public void startBundle(Context context) throws Exception {
+        batch = new ArrayList<>();
+        currentBatchSizeBytes = 0;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) throws Exception {
+        String document = context.element();
+        batch.add(String.format("{ \"index\" : {} }%n%s%n", document));
+        currentBatchSizeBytes += document.getBytes().length;
+        if (batch.size() >= spec.getMaxBatchSize()
+            || currentBatchSizeBytes >= spec.getMaxBatchSizeBytes()) {
+          finishBundle(context);
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle(Context context) throws Exception {
+        if (batch.isEmpty()) {
+          return;
+        }
+        StringBuilder bulkRequest = new StringBuilder();
+        for (String json : batch) {
+          bulkRequest.append(json);
+        }
+        batch.clear();
+        currentBatchSizeBytes = 0;
+        Response response;
+        String endPoint =
+            String.format(
+                "/%s/%s/_bulk",
+                spec.getConnectionConfiguration().getIndex(),
+                spec.getConnectionConfiguration().getType());
+        HttpEntity requestBody =
+            new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON);
+        response =
+            restClient.performRequest(
+                "POST",
+                endPoint,
+                Collections.<String, String>emptyMap(),
+                requestBody,
+                new BasicHeader("", ""));
+        JsonObject searchResult = parseResponse(response);
+        boolean errors = searchResult.getAsJsonPrimitive("errors").getAsBoolean();
+        if (errors) {
+          StringBuilder errorMessages =
+              new StringBuilder(
+                  "Error writing to Elasticsearch, some elements could not be inserted:");
+          JsonArray items = searchResult.getAsJsonArray("items");
+          //some items present in bulk might have errors, concatenate error messages
+          for (JsonElement item : items) {
+            JsonObject creationObject = item.getAsJsonObject().getAsJsonObject("create");
+            JsonObject error = creationObject.getAsJsonObject("error");
+            if (error != null) {
+              String type = error.getAsJsonPrimitive("type").getAsString();
+              String reason = error.getAsJsonPrimitive("reason").getAsString();
+              String docId = creationObject.getAsJsonPrimitive("_id").getAsString();
+              errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type));
+              JsonObject causedBy = error.getAsJsonObject("caused_by");
+              if (causedBy != null) {
+                String cbReason = causedBy.getAsJsonPrimitive("reason").getAsString();
+                String cbType = causedBy.getAsJsonPrimitive("type").getAsString();
+                errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType));
+              }
+            }
+          }
+          throw new IOException(errorMessages.toString());
+        }
+      }
+
+      @Teardown
+      public void closeClient() throws Exception {
+        if (restClient != null) {
+          restClient.close();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6412389a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/package-info.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/package-info.java
new file mode 100644
index 0000000..396705b
--- /dev/null
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Transforms for reading and writing from Elasticsearch. */
+package org.apache.beam.sdk.io.elasticsearch;

http://git-wip-us.apache.org/repos/asf/beam/blob/6412389a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
new file mode 100644
index 0000000..9a121f8
--- /dev/null
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.elasticsearch;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.IndicesAdminClient;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.index.IndexNotFoundException;
+
+/** Test utilities to use with {@link ElasticsearchIO}. */
+class ElasticSearchIOTestUtils {
+
+  /** Enumeration that specifies whether to insert malformed documents. */
+  enum InjectionMode {
+    INJECT_SOME_INVALID_DOCS,
+    DO_NOT_INJECT_INVALID_DOCS;
+  }
+
+  /** Deletes the given index synchronously. */
+  static void deleteIndex(String index, Client client) throws Exception {
+    IndicesAdminClient indices = client.admin().indices();
+    IndicesExistsResponse indicesExistsResponse =
+        indices.exists(new IndicesExistsRequest(index)).get();
+    if (indicesExistsResponse.isExists()) {
+      indices.prepareClose(index).get();
+      indices.delete(Requests.deleteIndexRequest(index)).get();
+    }
+  }
+
+  /** Inserts the given number of test documents into Elasticsearch. */
+  static void insertTestDocuments(String index, String type, long numDocs, Client client)
+      throws Exception {
+    final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setRefresh(true);
+    List<String> data =
+        ElasticSearchIOTestUtils.createDocuments(
+            numDocs, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+    for (String document : data) {
+      bulkRequestBuilder.add(client.prepareIndex(index, type, null).setSource(document));
+    }
+    final BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
+    if (bulkResponse.hasFailures()) {
+      throw new IOException(
+          String.format(
+              "Cannot insert test documents in index %s : %s",
+              index, bulkResponse.buildFailureMessage()));
+    }
+  }
+
+  /**
+   * Forces an upgrade of the given index to make recently inserted documents available for search.
+   *
+   * @return The number of docs in the index
+   */
+  static long upgradeIndexAndGetCurrentNumDocs(String index, String type, Client client) {
+    try {
+      client.admin().indices().upgrade(new UpgradeRequest(index)).actionGet();
+      SearchResponse response =
+          client.prepareSearch(index).setTypes(type).execute().actionGet(5000);
+      return response.getHits().getTotalHits();
+      // it is fine to ignore bellow exceptions because in testWriteWithBatchSize* sometimes,
+      // we call upgrade before any doc have been written
+      // (when there are fewer docs processed than batchSize).
+      // In that cases index/type has not been created (created upon first doc insertion)
+    } catch (IndexNotFoundException e) {
+    } catch (java.lang.IllegalArgumentException e) {
+      if (!e.getMessage().contains("No search type")) {
+        throw e;
+      }
+    }
+    return 0;
+  }
+
+  /**
+   * Generates a list of test documents for insertion.
+   *
+   * @param numDocs Number of docs to generate
+   * @param injectionMode {@link InjectionMode} that specifies whether to insert malformed documents
+   * @return the list of json String representing the documents
+   */
+  static List<String> createDocuments(long numDocs, InjectionMode injectionMode) {
+    String[] scientists = {
+      "Einstein",
+      "Darwin",
+      "Copernicus",
+      "Pasteur",
+      "Curie",
+      "Faraday",
+      "Newton",
+      "Bohr",
+      "Galilei",
+      "Maxwell"
+    };
+    ArrayList<String> data = new ArrayList<>();
+    for (int i = 0; i < numDocs; i++) {
+      int index = i % scientists.length;
+      // insert 2 malformed documents
+      if (InjectionMode.INJECT_SOME_INVALID_DOCS.equals(injectionMode) && (i == 6 || i == 7)) {
+        data.add(String.format("{\"scientist\";\"%s\", \"id\":%d}", scientists[index], i));
+      } else {
+        data.add(String.format("{\"scientist\":\"%s\", \"id\":%d}", scientists[index], i));
+      }
+    }
+    return data;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6412389a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
new file mode 100644
index 0000000..8b4cb13
--- /dev/null
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.elasticsearch;
+
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
+import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.core.Is.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.ServerSocket;
+import java.util.List;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.values.PCollection;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+import org.hamcrest.CustomMatcher;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Tests for {@link ElasticsearchIO}. */
+@RunWith(JUnit4.class)
+public class ElasticsearchIOTest implements Serializable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchIOTest.class);
+
+  private static final String ES_INDEX = "beam";
+  private static final String ES_TYPE = "test";
+  private static final String ES_IP = "127.0.0.1";
+  private static final long NUM_DOCS = 400L;
+  private static final int NUM_SCIENTISTS = 10;
+  private static final long BATCH_SIZE = 200L;
+  private static final long AVERAGE_DOC_SIZE = 25L;
+  private static final long BATCH_SIZE_BYTES = 2048L;
+
+  private static Node node;
+  private static ElasticsearchIO.ConnectionConfiguration connectionConfiguration;
+
+  @ClassRule public static TemporaryFolder folder = new TemporaryFolder();
+  @Rule
+  public TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void beforeClass() throws IOException {
+    ServerSocket serverSocket = new ServerSocket(0);
+    int esHttpPort = serverSocket.getLocalPort();
+    serverSocket.close();
+    connectionConfiguration =
+        ElasticsearchIO.ConnectionConfiguration.create(
+            new String[] {"http://" + ES_IP + ":" + esHttpPort}, ES_INDEX, ES_TYPE);
+    LOGGER.info("Starting embedded Elasticsearch instance ({})", esHttpPort);
+    Settings.Builder settingsBuilder =
+        Settings.settingsBuilder()
+            .put("cluster.name", "beam")
+            .put("http.enabled", "true")
+            .put("node.data", "true")
+            .put("path.data", folder.getRoot().getPath())
+            .put("path.home", folder.getRoot().getPath())
+            .put("node.name", "beam")
+            .put("network.host", ES_IP)
+            .put("http.port", esHttpPort)
+            .put("index.store.stats_refresh_interval", 0)
+            // had problems with some jdk, embedded ES was too slow for bulk insertion,
+            // and queue of 50 was full. No pb with real ES instance (cf testWrite integration test)
+            .put("threadpool.bulk.queue_size", 100);
+    node = NodeBuilder.nodeBuilder().settings(settingsBuilder).build();
+    LOGGER.info("Elasticsearch node created");
+    node.start();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    node.close();
+  }
+
+  @Before
+  public void before() throws Exception {
+    ElasticSearchIOTestUtils.deleteIndex(ES_INDEX, node.client());
+  }
+
+  @Test
+  public void testSizes() throws Exception {
+    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
+    PipelineOptions options = PipelineOptionsFactory.create();
+    ElasticsearchIO.Read read =
+        ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
+    BoundedElasticsearchSource initialSource = new BoundedElasticsearchSource(read, null);
+    // can't use equal assert as Elasticsearch indexes never have same size
+    // (due to internal Elasticsearch implementation)
+    long estimatedSize = initialSource.getEstimatedSizeBytes(options);
+    LOGGER.info("Estimated size: {}", estimatedSize);
+    assertThat("Wrong estimated size", estimatedSize, greaterThan(AVERAGE_DOC_SIZE * NUM_DOCS));
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testRead() throws Exception {
+    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
+
+    PCollection<String> output =
+        pipeline.apply(
+            ElasticsearchIO.read()
+                .withConnectionConfiguration(connectionConfiguration)
+                //set to default value, useful just to test parameter passing.
+                .withScrollKeepalive("5m")
+                //set to default value, useful just to test parameter passing.
+                .withBatchSize(100L));
+    PAssert.thatSingleton(output.apply("Count", Count.<String>globally())).isEqualTo(NUM_DOCS);
+    pipeline.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testReadWithQuery() throws Exception {
+    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
+
+    String query =
+        "{\n"
+            + "  \"query\": {\n"
+            + "  \"match\" : {\n"
+            + "    \"scientist\" : {\n"
+            + "      \"query\" : \"Einstein\",\n"
+            + "      \"type\" : \"boolean\"\n"
+            + "    }\n"
+            + "  }\n"
+            + "  }\n"
+            + "}";
+
+    PCollection<String> output =
+        pipeline.apply(
+            ElasticsearchIO.read()
+                .withConnectionConfiguration(connectionConfiguration)
+                .withQuery(query));
+    PAssert.thatSingleton(output.apply("Count", Count.<String>globally()))
+        .isEqualTo(NUM_DOCS / NUM_SCIENTISTS);
+    pipeline.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testWrite() throws Exception {
+    List<String> data =
+        ElasticSearchIOTestUtils.createDocuments(
+            NUM_DOCS, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+    pipeline
+        .apply(Create.of(data))
+        .apply(ElasticsearchIO.write().withConnectionConfiguration(connectionConfiguration));
+    pipeline.run();
+
+    long currentNumDocs =
+        ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, node.client());
+    assertEquals(NUM_DOCS, currentNumDocs);
+
+    QueryBuilder queryBuilder = QueryBuilders.queryStringQuery("Einstein").field("scientist");
+    SearchResponse searchResponse =
+        node.client()
+            .prepareSearch(ES_INDEX)
+            .setTypes(ES_TYPE)
+            .setQuery(queryBuilder)
+            .execute()
+            .actionGet();
+    assertEquals(NUM_DOCS / NUM_SCIENTISTS, searchResponse.getHits().getTotalHits());
+  }
+
+  @Rule public ExpectedException exception = ExpectedException.none();
+
+  @Test
+  public void testWriteWithErrors() throws Exception {
+    ElasticsearchIO.Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withMaxBatchSize(BATCH_SIZE);
+    // write bundles size is the runner decision, we cannot force a bundle size,
+    // so we test the Writer as a DoFn outside of a runner.
+    DoFnTester<String, Void> fnTester = DoFnTester.of(new ElasticsearchIO.Write.WriteFn(write));
+
+    List<String> input =
+        ElasticSearchIOTestUtils.createDocuments(
+            NUM_DOCS, ElasticSearchIOTestUtils.InjectionMode.INJECT_SOME_INVALID_DOCS);
+    exception.expect(isA(IOException.class));
+    exception.expectMessage(
+        new CustomMatcher<String>("RegExp matcher") {
+          @Override
+          public boolean matches(Object o) {
+            String message = (String) o;
+            // This regexp tests that 2 malformed documents are actually in error
+            // and that the message contains their IDs.
+            // It also ensures that root reason, root error type,
+            // caused by reason and caused by error type are present in message.
+            // To avoid flakiness of the test in case of Elasticsearch error message change,
+            // only "failed to parse" root reason is matched,
+            // the other messages are matched using .+
+            return message.matches(
+                "(?is).*Error writing to Elasticsearch, some elements could not be inserted"
+                    + ".*Document id .+: failed to parse \\(.+\\).*Caused by: .+ \\(.+\\).*"
+                    + "Document id .+: failed to parse \\(.+\\).*Caused by: .+ \\(.+\\).*");
+          }
+        });
+    // inserts into Elasticsearch
+    fnTester.processBundle(input);
+  }
+
+  @Test
+  public void testWriteWithMaxBatchSize() throws Exception {
+    ElasticsearchIO.Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withMaxBatchSize(BATCH_SIZE);
+    // write bundles size is the runner decision, we cannot force a bundle size,
+    // so we test the Writer as a DoFn outside of a runner.
+    DoFnTester<String, Void> fnTester = DoFnTester.of(new ElasticsearchIO.Write.WriteFn(write));
+    List<String> input =
+        ElasticSearchIOTestUtils.createDocuments(
+            NUM_DOCS, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+    long numDocsProcessed = 0;
+    long numDocsInserted = 0;
+    for (String document : input) {
+      fnTester.processElement(document);
+      numDocsProcessed++;
+      // test every 100 docs to avoid overloading ES
+      if ((numDocsProcessed % 100) == 0) {
+        // force the index to upgrade after inserting for the inserted docs
+        // to be searchable immediately
+        long currentNumDocs =
+            ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(
+                ES_INDEX, ES_TYPE, node.client());
+        if ((numDocsProcessed % BATCH_SIZE) == 0) {
+          /* bundle end */
+          assertEquals(
+              "we are at the end of a bundle, we should have inserted all processed documents",
+              numDocsProcessed,
+              currentNumDocs);
+          numDocsInserted = currentNumDocs;
+        } else {
+          /* not bundle end */
+          assertEquals(
+              "we are not at the end of a bundle, we should have inserted no more documents",
+              numDocsInserted,
+              currentNumDocs);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testWriteWithMaxBatchSizeBytes() throws Exception {
+    ElasticsearchIO.Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withMaxBatchSizeBytes(BATCH_SIZE_BYTES);
+    // write bundles size is the runner decision, we cannot force a bundle size,
+    // so we test the Writer as a DoFn outside of a runner.
+    DoFnTester<String, Void> fnTester = DoFnTester.of(new ElasticsearchIO.Write.WriteFn(write));
+    List<String> input =
+        ElasticSearchIOTestUtils.createDocuments(
+            NUM_DOCS, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+    long numDocsProcessed = 0;
+    long sizeProcessed = 0;
+    long numDocsInserted = 0;
+    long batchInserted = 0;
+    for (String document : input) {
+      fnTester.processElement(document);
+      numDocsProcessed++;
+      sizeProcessed += document.getBytes().length;
+      // test every 40 docs to avoid overloading ES
+      if ((numDocsProcessed % 40) == 0) {
+        // force the index to upgrade after inserting for the inserted docs
+        // to be searchable immediately
+        long currentNumDocs =
+            ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(
+                ES_INDEX, ES_TYPE, node.client());
+        if (sizeProcessed / BATCH_SIZE_BYTES > batchInserted) {
+          /* bundle end */
+          assertThat(
+              "we have passed a bundle size, we should have inserted some documents",
+              currentNumDocs,
+              greaterThan(numDocsInserted));
+          numDocsInserted = currentNumDocs;
+          batchInserted = (sizeProcessed / BATCH_SIZE_BYTES);
+        } else {
+          /* not bundle end */
+          assertEquals(
+              "we are not at the end of a bundle, we should have inserted no more documents",
+              numDocsInserted,
+              currentNumDocs);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSplitIntoBundles() throws Exception {
+    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
+    PipelineOptions options = PipelineOptionsFactory.create();
+    ElasticsearchIO.Read read =
+        ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
+    BoundedElasticsearchSource initialSource = new BoundedElasticsearchSource(read, null);
+    //desiredBundleSize is ignored because in ES 2.x there is no way to split shards. So we get
+    // as many bundles as ES shards and bundle size is shard size
+    int desiredBundleSizeBytes = 0;
+    List<? extends BoundedSource<String>> splits =
+        initialSource.splitIntoBundles(desiredBundleSizeBytes, options);
+    SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
+    //this is the number of ES shards
+    // (By default, each index in Elasticsearch is allocated 5 primary shards)
+    int expectedNumSplits = 5;
+    assertEquals(expectedNumSplits, splits.size());
+    int nonEmptySplits = 0;
+    for (BoundedSource<String> subSource : splits) {
+      if (readFromSource(subSource, options).size() > 0) {
+        nonEmptySplits += 1;
+      }
+    }
+    assertEquals("Wrong number of empty splits", expectedNumSplits, nonEmptySplits);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6412389a/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 75c4f65..ffe3c02 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -40,6 +40,7 @@
     <module>kinesis</module>
     <module>mongodb</module>
     <module>jdbc</module>
+    <module>elasticsearch</module>
     <module>mqtt</module>
   </modules>
 


[2/2] beam git commit: [BEAM-425] This closes #1439

Posted by jb...@apache.org.
[BEAM-425] This closes #1439


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/926ec8e8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/926ec8e8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/926ec8e8

Branch: refs/heads/master
Commit: 926ec8e8066f2161af878d19b75a1ea3774500a3
Parents: d1d85df 6412389
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Wed Jan 4 13:53:24 2017 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Jan 4 13:53:24 2017 +0100

----------------------------------------------------------------------
 sdks/java/io/elasticsearch/pom.xml              | 175 ++++
 .../sdk/io/elasticsearch/ElasticsearchIO.java   | 819 +++++++++++++++++++
 .../beam/sdk/io/elasticsearch/package-info.java |  20 +
 .../elasticsearch/ElasticSearchIOTestUtils.java | 129 +++
 .../io/elasticsearch/ElasticsearchIOTest.java   | 358 ++++++++
 sdks/java/io/pom.xml                            |   1 +
 6 files changed, 1502 insertions(+)
----------------------------------------------------------------------