You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/01/04 12:53:44 UTC
[1/2] beam git commit: [BEAM-425] Add ElasticsearchIO
Repository: beam
Updated Branches:
refs/heads/master d1d85dfc7 -> 926ec8e80
[BEAM-425] Add ElasticsearchIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6412389a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6412389a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6412389a
Branch: refs/heads/master
Commit: 6412389a02ad0dbdac5cfe5425a8a56462071477
Parents: d1d85df
Author: Etienne Chauchot and Jean-Baptiste Onofr� <ec...@apache.org>
Authored: Thu Oct 20 11:45:47 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Jan 4 13:39:07 2017 +0100
----------------------------------------------------------------------
sdks/java/io/elasticsearch/pom.xml | 175 ++++
.../sdk/io/elasticsearch/ElasticsearchIO.java | 819 +++++++++++++++++++
.../beam/sdk/io/elasticsearch/package-info.java | 20 +
.../elasticsearch/ElasticSearchIOTestUtils.java | 129 +++
.../io/elasticsearch/ElasticsearchIOTest.java | 358 ++++++++
sdks/java/io/pom.xml | 1 +
6 files changed, 1502 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6412389a/sdks/java/io/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/pom.xml b/sdks/java/io/elasticsearch/pom.xml
new file mode 100644
index 0000000..94e8c6c
--- /dev/null
+++ b/sdks/java/io/elasticsearch/pom.xml
@@ -0,0 +1,175 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-parent</artifactId>
+ <version>0.5.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-sdks-java-io-elasticsearch</artifactId>
+ <name>Apache Beam :: SDKs :: Java :: IO :: Elasticsearch</name>
+ <description>IO to read and write on Elasticsearch.</description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.6.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>rest</artifactId>
+ <version>5.0.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore-nio</artifactId>
+ <version>4.4.5</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>4.4.5</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpasyncclient</artifactId>
+ <version>4.1.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.2</version>
+ </dependency>
+
+ <!-- compile dependencies -->
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test -->
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>2.4.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>1.3.2</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/6412389a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
new file mode 100644
index 0000000..5073834
--- /dev/null
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -0,0 +1,819 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.elasticsearch;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.nio.entity.NStringEntity;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+
+/**
+ * Transforms for reading and writing data from/to Elasticsearch.
+ *
+ * <h3>Reading from Elasticsearch</h3>
+ *
+ * <p>{@link ElasticsearchIO#read ElasticsearchIO.read()} returns a bounded
+ * {@link PCollection PCollection<String>} representing JSON documents.
+ *
+ * <p>To configure the {@link ElasticsearchIO#read}, you have to provide a connection configuration
+ * containing the HTTP address of the instances, an index name and a type. The following example
+ * illustrates options for configuring the source:
+ *
+ * <pre>{@code
+ *
+ * pipeline.apply(ElasticsearchIO.read().withConnectionConfiguration(
+ * ElasticsearchIO.ConnectionConfiguration.create("http://host:9200", "my-index", "my-type")
+ * )
+ *
+ * }</pre>
+ *
+ * <p>The connection configuration also accepts optional configuration: {@code withUsername()} and
+ * {@code withPassword()}.
+ *
+ * <p>You can also specify a query on the {@code read()} using {@code withQuery()}.
+ *
+ * <h3>Writing to Elasticsearch</h3>
+ *
+ * <p>To write documents to Elasticsearch, use
+ * {@link ElasticsearchIO#write ElasticsearchIO.write()}, which writes JSON documents from a
+ * {@link PCollection PCollection<String>} (which can be bounded or unbounded).
+ *
+ * <p>To configure {@link ElasticsearchIO#write ElasticsearchIO.write()}, similar to the read, you
+ * have to provide a connection configuration. For instance:
+ *
+ * <pre>{@code
+ *
+ * pipeline
+ * .apply(...)
+ * .apply(ElasticsearchIO.write().withConnectionConfiguration(
+ * ElasticsearchIO.ConnectionConfiguration.create("http://host:9200", "my-index", "my-type")
+ * )
+ *
+ * }</pre>
+ *
+ * <p>Optionally, you can provide {@code withBatchSize()} and {@code withBatchSizeBytes()}
+ * to specify the size of the write batch in number of documents or in bytes.
+ */
+public class ElasticsearchIO {
+
+ public static Read read() {
+ // default scrollKeepalive = 5m as a majorant for un-predictable time between 2 start/read calls
+ // default batchSize to 100 as recommended by ES dev team as a safe value when dealing
+ // with big documents and still a good compromise for performances
+ return new AutoValue_ElasticsearchIO_Read.Builder()
+ .setScrollKeepalive("5m")
+ .setBatchSize(100L)
+ .build();
+ }
+
+ public static Write write() {
+ return new AutoValue_ElasticsearchIO_Write.Builder()
+ // advised default starting batch size in ES docs
+ .setMaxBatchSize(1000L)
+ // advised default starting batch size in ES docs
+ .setMaxBatchSizeBytes(5L * 1024L * 1024L)
+ .build();
+ }
+
+ private ElasticsearchIO() {}
+
+ private static JsonObject parseResponse(Response response) throws IOException {
+ InputStream content = response.getEntity().getContent();
+ InputStreamReader inputStreamReader = new InputStreamReader(content, "UTF-8");
+ JsonObject jsonObject = new Gson().fromJson(inputStreamReader, JsonObject.class);
+ return jsonObject;
+ }
+
+ /** A POJO describing a connection configuration to Elasticsearch. */
+ @AutoValue
+ public abstract static class ConnectionConfiguration implements Serializable {
+
+ abstract List<String> getAddresses();
+
+ @Nullable
+ abstract String getUsername();
+
+ @Nullable
+ abstract String getPassword();
+
+ abstract String getIndex();
+
+ abstract String getType();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setAddresses(List<String> addresses);
+
+ abstract Builder setUsername(String username);
+
+ abstract Builder setPassword(String password);
+
+ abstract Builder setIndex(String index);
+
+ abstract Builder setType(String type);
+
+ abstract ConnectionConfiguration build();
+ }
+
+ /**
+ * Creates a new Elasticsearch connection configuration.
+ *
+ * @param addresses list of addresses of Elasticsearch nodes
+ * @param index the index toward which the requests will be issued
+ * @param type the document type toward which the requests will be issued
+ * @return the connection configuration object
+ */
+ public static ConnectionConfiguration create(String[] addresses, String index, String type) {
+ checkArgument(
+ addresses != null,
+ "ConnectionConfiguration.create(addresses, index, type) called with null address");
+ checkArgument(
+ addresses.length != 0,
+ "ConnectionConfiguration.create(addresses, "
+ + "index, type) called with empty addresses");
+ checkArgument(
+ index != null,
+ "ConnectionConfiguration.create(addresses, index, type) called with null index");
+ checkArgument(
+ type != null,
+ "ConnectionConfiguration.create(addresses, index, type) called with null type");
+ return new AutoValue_ElasticsearchIO_ConnectionConfiguration.Builder()
+ .setAddresses(Arrays.asList(addresses))
+ .setIndex(index)
+ .setType(type)
+ .build();
+ }
+
+ /**
+ * If Elasticsearch authentication is enabled, provide the username.
+ *
+ * @param username the username used to authenticate to Elasticsearch
+ * @return the {@link ConnectionConfiguration} object with username set
+ */
+ public ConnectionConfiguration withUsername(String username) {
+ checkArgument(
+ username != null,
+ "ConnectionConfiguration.create().withUsername(username) called with null username");
+ checkArgument(
+ !username.isEmpty(),
+ "ConnectionConfiguration.create().withUsername(username) called with empty username");
+ return builder().setUsername(username).build();
+ }
+
+ /**
+ * If Elasticsearch authentication is enabled, provide the password.
+ *
+ * @param password the password used to authenticate to Elasticsearch
+ * @return the {@link ConnectionConfiguration} object with password set
+ */
+ public ConnectionConfiguration withPassword(String password) {
+ checkArgument(
+ password != null,
+ "ConnectionConfiguration.create().withPassword(password) called with null password");
+ checkArgument(
+ !password.isEmpty(),
+ "ConnectionConfiguration.create().withPassword(password) called with empty password");
+ return builder().setPassword(password).build();
+ }
+
+ private void populateDisplayData(DisplayData.Builder builder) {
+ builder.add(DisplayData.item("address", getAddresses().toString()));
+ builder.add(DisplayData.item("index", getIndex()));
+ builder.add(DisplayData.item("type", getType()));
+ builder.addIfNotNull(DisplayData.item("username", getUsername()));
+ }
+
+ private RestClient createClient() throws MalformedURLException {
+ HttpHost[] hosts = new HttpHost[getAddresses().size()];
+ int i = 0;
+ for (String address : getAddresses()) {
+ URL url = new URL(address);
+ hosts[i] = new HttpHost(url.getHost(), url.getPort(), url.getProtocol());
+ i++;
+ }
+ RestClientBuilder restClientBuilder = RestClient.builder(hosts);
+ if (getUsername() != null) {
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(
+ AuthScope.ANY, new UsernamePasswordCredentials(getUsername(), getPassword()));
+ restClientBuilder.setHttpClientConfigCallback(
+ new RestClientBuilder.HttpClientConfigCallback() {
+ public HttpAsyncClientBuilder customizeHttpClient(
+ HttpAsyncClientBuilder httpAsyncClientBuilder) {
+ return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ }
+ });
+ }
+ return restClientBuilder.build();
+ }
+ }
+
+ /** A {@link PTransform} reading data from Elasticsearch. */
+ @AutoValue
+ public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
+
+ private static final long MAX_BATCH_SIZE = 10000L;
+
+ @Nullable
+ abstract ConnectionConfiguration getConnectionConfiguration();
+
+ @Nullable
+ abstract String getQuery();
+
+ abstract String getScrollKeepalive();
+
+ abstract long getBatchSize();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);
+
+ abstract Builder setQuery(String query);
+
+ abstract Builder setScrollKeepalive(String scrollKeepalive);
+
+ abstract Builder setBatchSize(long batchSize);
+
+ abstract Read build();
+ }
+
+ /**
+ * Provide the Elasticsearch connection configuration object.
+ *
+ * @param connectionConfiguration the Elasticsearch {@link ConnectionConfiguration} object
+ * @return the {@link Read} with connection configuration set
+ */
+ public Read withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
+ checkArgument(
+ connectionConfiguration != null,
+ "ElasticsearchIO.read()"
+ + ".withConnectionConfiguration(configuration) called with null configuration");
+ return builder().setConnectionConfiguration(connectionConfiguration).build();
+ }
+
+ /**
+ * Provide a query used while reading from Elasticsearch.
+ *
+ * @param query the query. See <a
+ * href="https://www.elastic.co/guide/en/elasticsearch/reference/2.4/query-dsl.html">Query
+ * DSL</a>
+ * @return the {@link Read} object with query set
+ */
+ public Read withQuery(String query) {
+ checkArgument(
+ !Strings.isNullOrEmpty(query),
+ "ElasticsearchIO.read().withQuery(query) called" + " with null or empty query");
+ return builder().setQuery(query).build();
+ }
+
+ /**
+ * Provide a scroll keepalive. See <a
+ * href="https://www.elastic.co/guide/en/elasticsearch/reference/2.4/search-request-scroll.html">scroll
+ * API</a> Default is "5m". Change this only if you get "No search context found" errors.
+ *
+ * @param scrollKeepalive keepalive duration ex "5m" from 5 minutes
+ * @return the {@link Read} with scroll keepalive set
+ */
+ public Read withScrollKeepalive(String scrollKeepalive) {
+ checkArgument(
+ scrollKeepalive != null && !scrollKeepalive.equals("0m"),
+ "ElasticsearchIO.read().withScrollKeepalive(keepalive) called"
+ + " with null or \"0m\" keepalive");
+ return builder().setScrollKeepalive(scrollKeepalive).build();
+ }
+
+ /**
+ * Provide a size for the scroll read. See <a
+ * href="https://www.elastic.co/guide/en/elasticsearch/reference/2.4/search-request-scroll.html">
+ * scroll API</a> Default is 100. Maximum is 10 000. If documents are small, increasing batch
+ * size might improve read performance. If documents are big, you might need to decrease
+ * batchSize
+ *
+ * @param batchSize number of documents read in each scroll read
+ * @return the {@link Read} with batch size set
+ */
+ public Read withBatchSize(long batchSize) {
+ checkArgument(
+ batchSize > 0,
+ "ElasticsearchIO.read().withBatchSize(batchSize) called with a negative "
+ + "or equal to 0 value: %s",
+ batchSize);
+ checkArgument(
+ batchSize <= MAX_BATCH_SIZE,
+ "ElasticsearchIO.read().withBatchSize(batchSize) "
+ + "called with a too large value (over %s): %s",
+ MAX_BATCH_SIZE,
+ batchSize);
+ return builder().setBatchSize(batchSize).build();
+ }
+
+ @Override
+ public PCollection<String> expand(PBegin input) {
+ return input.apply(
+ org.apache.beam.sdk.io.Read.from(new BoundedElasticsearchSource(this, null)));
+ }
+
+ @Override
+ public void validate(PBegin input) {
+ checkState(
+ getConnectionConfiguration() != null,
+ "ElasticsearchIO.read() requires a connection configuration"
+ + " to be set via withConnectionConfiguration(configuration)");
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.addIfNotNull(DisplayData.item("query", getQuery()));
+ getConnectionConfiguration().populateDisplayData(builder);
+ }
+ }
+
+ /** A {@link BoundedSource} reading from Elasticsearch. */
+ @VisibleForTesting
+ static class BoundedElasticsearchSource extends BoundedSource<String> {
+
+ private final ElasticsearchIO.Read spec;
+ // shardPreference is the shard number where the source will read the documents
+ @Nullable private final String shardPreference;
+
+ BoundedElasticsearchSource(Read spec, @Nullable String shardPreference) {
+ this.spec = spec;
+ this.shardPreference = shardPreference;
+ }
+
+ @Override
+ public List<? extends BoundedSource<String>> splitIntoBundles(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+ List<BoundedElasticsearchSource> sources = new ArrayList<>();
+
+ // 1. We split per shard :
+ // unfortunately, Elasticsearch 2. x doesn 't provide a way to do parallel reads on a single
+ // shard.So we do not use desiredBundleSize because we cannot split shards.
+ // With the slice API in ES 5.0 we will be able to use desiredBundleSize.
+ // Basically we will just ask the slice API to return data
+ // in nbBundles = estimatedSize / desiredBundleSize chuncks.
+ // So each beam source will read around desiredBundleSize volume of data.
+
+ // 2. Primary and replica shards have the same shard_id, we filter primary
+ // to have one source for each shard_id. Even if we specify preference=shards:2,
+ // ES load balances (round robin) the request between primary shard 2 and replica shard 2.
+ // But, as each shard (replica or primary) is responsible for only one part of the data,
+ // there will be no duplicate.
+
+ JsonObject statsJson = getStats(true);
+ JsonObject shardsJson =
+ statsJson
+ .getAsJsonObject("indices")
+ .getAsJsonObject(spec.getConnectionConfiguration().getIndex())
+ .getAsJsonObject("shards");
+ Set<Map.Entry<String, JsonElement>> shards = shardsJson.entrySet();
+ for (Map.Entry<String, JsonElement> shardJson : shards) {
+ String shardId = shardJson.getKey();
+ JsonArray value = (JsonArray) shardJson.getValue();
+ boolean isPrimaryShard =
+ value
+ .get(0)
+ .getAsJsonObject()
+ .getAsJsonObject("routing")
+ .getAsJsonPrimitive("primary")
+ .getAsBoolean();
+ if (isPrimaryShard) {
+ sources.add(new BoundedElasticsearchSource(spec, shardId));
+ }
+ }
+ checkArgument(!sources.isEmpty(), "No primary shard found");
+ return sources;
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
+ // we use indices stats API to estimate size and list the shards
+ // (https://www.elastic.co/guide/en/elasticsearch/reference/2.4/indices-stats.html)
+ // as Elasticsearch 2.x doesn't not support any way to do parallel read inside a shard
+ // the estimated size bytes is not really used in the split into bundles.
+ // However, we implement this method anyway as the runners can use it.
+ // NB: Elasticsearch 5.x now provides the slice API.
+ // (https://www.elastic.co/guide/en/elasticsearch/reference/5.0/search-request-scroll.html
+ // #sliced-scroll)
+ JsonObject statsJson = getStats(false);
+ JsonObject indexStats =
+ statsJson
+ .getAsJsonObject("indices")
+ .getAsJsonObject(spec.getConnectionConfiguration().getIndex())
+ .getAsJsonObject("primaries");
+ JsonObject store = indexStats.getAsJsonObject("store");
+ return store.getAsJsonPrimitive("size_in_bytes").getAsLong();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ spec.populateDisplayData(builder);
+ builder.addIfNotNull(DisplayData.item("shard", shardPreference));
+ }
+
+ @Override
+ public BoundedReader<String> createReader(PipelineOptions options) throws IOException {
+ return new BoundedElasticsearchReader(this);
+ }
+
+ @Override
+ public void validate() {
+ spec.validate(null);
+ }
+
+ @Override
+ public Coder<String> getDefaultOutputCoder() {
+ return StringUtf8Coder.of();
+ }
+
+ private JsonObject getStats(boolean shardLevel) throws IOException {
+ HashMap<String, String> params = new HashMap<>();
+ if (shardLevel) {
+ params.put("level", "shards");
+ }
+ String endpoint = String.format("/%s/_stats", spec.getConnectionConfiguration().getIndex());
+ try (RestClient restClient = spec.getConnectionConfiguration().createClient()) {
+ return parseResponse(
+ restClient.performRequest("GET", endpoint, params, new BasicHeader("", "")));
+ }
+ }
+ }
+
+ private static class BoundedElasticsearchReader extends BoundedSource.BoundedReader<String> {
+
+ private final BoundedElasticsearchSource source;
+
+ private RestClient restClient;
+ private String current;
+ private String scrollId;
+ private ListIterator<String> batchIterator;
+
+ private BoundedElasticsearchReader(BoundedElasticsearchSource source) {
+ this.source = source;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ restClient = source.spec.getConnectionConfiguration().createClient();
+
+ String query = source.spec.getQuery();
+ if (query == null) {
+ query = "{ \"query\": { \"match_all\": {} } }";
+ }
+
+ Response response;
+ String endPoint =
+ String.format(
+ "/%s/%s/_search",
+ source.spec.getConnectionConfiguration().getIndex(),
+ source.spec.getConnectionConfiguration().getType());
+ Map<String, String> params = new HashMap<>();
+ params.put("scroll", source.spec.getScrollKeepalive());
+ params.put("size", String.valueOf(source.spec.getBatchSize()));
+ if (source.shardPreference != null) {
+ params.put("preference", "_shards:" + source.shardPreference);
+ }
+ HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON);
+ response =
+ restClient.performRequest("GET", endPoint, params, queryEntity, new BasicHeader("", ""));
+ JsonObject searchResult = parseResponse(response);
+ updateScrollId(searchResult);
+ return readNextBatchAndReturnFirstDocument(searchResult);
+ }
+
+ private void updateScrollId(JsonObject searchResult) {
+ scrollId = searchResult.getAsJsonPrimitive("_scroll_id").getAsString();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (batchIterator.hasNext()) {
+ current = batchIterator.next();
+ return true;
+ } else {
+ String requestBody =
+ String.format(
+ "{\"scroll\" : \"%s\",\"scroll_id\" : \"%s\"}",
+ source.spec.getScrollKeepalive(), scrollId);
+ HttpEntity scrollEntity = new NStringEntity(requestBody, ContentType.APPLICATION_JSON);
+ Response response =
+ restClient.performRequest(
+ "GET",
+ "/_search/scroll",
+ Collections.<String, String>emptyMap(),
+ scrollEntity,
+ new BasicHeader("", ""));
+ JsonObject searchResult = parseResponse(response);
+ updateScrollId(searchResult);
+ return readNextBatchAndReturnFirstDocument(searchResult);
+ }
+ }
+
+ private boolean readNextBatchAndReturnFirstDocument(JsonObject searchResult) {
+ //stop if no more data
+ JsonArray hits = searchResult.getAsJsonObject("hits").getAsJsonArray("hits");
+ if (hits.size() == 0) {
+ current = null;
+ batchIterator = null;
+ return false;
+ }
+ // list behind iterator is empty
+ List<String> batch = new ArrayList<>();
+ for (JsonElement hit : hits) {
+ String document = hit.getAsJsonObject().getAsJsonObject("_source").toString();
+ batch.add(document);
+ }
+ batchIterator = batch.listIterator();
+ current = batchIterator.next();
+ return true;
+ }
+
+ @Override
+ public String getCurrent() throws NoSuchElementException {
+ if (current == null) {
+ throw new NoSuchElementException();
+ }
+ return current;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // remove the scroll
+ String requestBody = String.format("{\"scroll_id\" : [\"%s\"]}", scrollId);
+ HttpEntity entity = new NStringEntity(requestBody, ContentType.APPLICATION_JSON);
+ try {
+ restClient.performRequest(
+ "DELETE",
+ "/_search/scroll",
+ Collections.<String, String>emptyMap(),
+ entity,
+ new BasicHeader("", ""));
+ } finally {
+ if (restClient != null) {
+ restClient.close();
+ }
+ }
+ }
+
+ @Override
+ public BoundedSource<String> getCurrentSource() {
+ return source;
+ }
+ }
+
+ /** A {@link PTransform} writing data to Elasticsearch. */
+ @AutoValue
+ public abstract static class Write extends PTransform<PCollection<String>, PDone> {
+
+ @Nullable
+ abstract ConnectionConfiguration getConnectionConfiguration();
+
+ abstract long getMaxBatchSize();
+
+ abstract long getMaxBatchSizeBytes();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);
+
+ abstract Builder setMaxBatchSize(long maxBatchSize);
+
+ abstract Builder setMaxBatchSizeBytes(long maxBatchSizeBytes);
+
+ abstract Write build();
+ }
+
+ /**
+ * Provide the Elasticsearch connection configuration object.
+ *
+ * @param connectionConfiguration the Elasticsearch {@link ConnectionConfiguration} object
+ * @return the {@link Write} with connection configuration set
+ */
+ public Write withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
+ checkArgument(
+ connectionConfiguration != null,
+ "ElasticsearchIO.write()"
+ + ".withConnectionConfiguration(configuration) called with null configuration");
+ return builder().setConnectionConfiguration(connectionConfiguration).build();
+ }
+
+ /**
+ * Provide a maximum size in number of documents for the batch see bulk API
+ * (https://www.elastic.co/guide/en/elasticsearch/reference/2.4/docs-bulk.html). Default is 1000
+ * docs (like Elasticsearch bulk size advice). See
+ * https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html Depending on the
+ * execution engine, size of bundles may vary, this sets the maximum size. Change this if you
+ * need to have smaller ElasticSearch bulks.
+ *
+ * @param batchSize maximum batch size in number of documents
+ * @return the {@link Write} with connection batch size set
+ */
+ public Write withMaxBatchSize(long batchSize) {
+ checkArgument(
+ batchSize > 0,
+ "ElasticsearchIO.write()"
+ + ".withMaxBatchSize(batchSize) called with incorrect <= 0 value");
+ return builder().setMaxBatchSize(batchSize).build();
+ }
+
+ /**
+ * Provide a maximum size in bytes for the batch see bulk API
+ * (https://www.elastic.co/guide/en/elasticsearch/reference/2.4/docs-bulk.html). Default is 5MB
+ * (like Elasticsearch bulk size advice). See
+ * https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html Depending on the
+ * execution engine, size of bundles may vary, this sets the maximum size. Change this if you
+ * need to have smaller ElasticSearch bulks.
+ *
+ * @param batchSizeBytes maximum batch size in bytes
+ * @return the {@link Write} with connection batch size in bytes set
+ */
+ public Write withMaxBatchSizeBytes(long batchSizeBytes) {
+ checkArgument(
+ batchSizeBytes > 0,
+ "ElasticsearchIO.write()"
+ + ".withMaxBatchSizeBytes(batchSizeBytes) called with incorrect <= 0 value");
+ return builder().setMaxBatchSizeBytes(batchSizeBytes).build();
+ }
+
+ @Override
+ public void validate(PCollection<String> input) {
+ checkState(
+ getConnectionConfiguration() != null,
+ "ElasticsearchIO.write() requires a connection configuration"
+ + " to be set via withConnectionConfiguration(configuration)");
+ }
+
+ @Override
+ public PDone expand(PCollection<String> input) {
+ input.apply(ParDo.of(new WriteFn(this)));
+ return PDone.in(input.getPipeline());
+ }
+
+ @VisibleForTesting
+ static class WriteFn extends DoFn<String, Void> {
+
+ private final Write spec;
+
+ private transient RestClient restClient;
+ private ArrayList<String> batch;
+ private long currentBatchSizeBytes;
+
+ WriteFn(Write spec) {
+ this.spec = spec;
+ }
+
+ @Setup
+ public void createClient() throws Exception {
+ restClient = spec.getConnectionConfiguration().createClient();
+ }
+
+ @StartBundle
+ public void startBundle(Context context) throws Exception {
+ batch = new ArrayList<>();
+ currentBatchSizeBytes = 0;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) throws Exception {
+ String document = context.element();
+ batch.add(String.format("{ \"index\" : {} }%n%s%n", document));
+ currentBatchSizeBytes += document.getBytes().length;
+ if (batch.size() >= spec.getMaxBatchSize()
+ || currentBatchSizeBytes >= spec.getMaxBatchSizeBytes()) {
+ finishBundle(context);
+ }
+ }
+
+ @FinishBundle
+ public void finishBundle(Context context) throws Exception {
+ if (batch.isEmpty()) {
+ return;
+ }
+ StringBuilder bulkRequest = new StringBuilder();
+ for (String json : batch) {
+ bulkRequest.append(json);
+ }
+ batch.clear();
+ currentBatchSizeBytes = 0;
+ Response response;
+ String endPoint =
+ String.format(
+ "/%s/%s/_bulk",
+ spec.getConnectionConfiguration().getIndex(),
+ spec.getConnectionConfiguration().getType());
+ HttpEntity requestBody =
+ new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON);
+ response =
+ restClient.performRequest(
+ "POST",
+ endPoint,
+ Collections.<String, String>emptyMap(),
+ requestBody,
+ new BasicHeader("", ""));
+ JsonObject searchResult = parseResponse(response);
+ boolean errors = searchResult.getAsJsonPrimitive("errors").getAsBoolean();
+ if (errors) {
+ StringBuilder errorMessages =
+ new StringBuilder(
+ "Error writing to Elasticsearch, some elements could not be inserted:");
+ JsonArray items = searchResult.getAsJsonArray("items");
+ //some items present in bulk might have errors, concatenate error messages
+ for (JsonElement item : items) {
+ JsonObject creationObject = item.getAsJsonObject().getAsJsonObject("create");
+ JsonObject error = creationObject.getAsJsonObject("error");
+ if (error != null) {
+ String type = error.getAsJsonPrimitive("type").getAsString();
+ String reason = error.getAsJsonPrimitive("reason").getAsString();
+ String docId = creationObject.getAsJsonPrimitive("_id").getAsString();
+ errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type));
+ JsonObject causedBy = error.getAsJsonObject("caused_by");
+ if (causedBy != null) {
+ String cbReason = causedBy.getAsJsonPrimitive("reason").getAsString();
+ String cbType = causedBy.getAsJsonPrimitive("type").getAsString();
+ errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType));
+ }
+ }
+ }
+ throw new IOException(errorMessages.toString());
+ }
+ }
+
+ @Teardown
+ public void closeClient() throws Exception {
+ if (restClient != null) {
+ restClient.close();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/6412389a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/package-info.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/package-info.java
new file mode 100644
index 0000000..396705b
--- /dev/null
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Transforms for reading and writing from Elasticsearch. */
+package org.apache.beam.sdk.io.elasticsearch;
http://git-wip-us.apache.org/repos/asf/beam/blob/6412389a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
new file mode 100644
index 0000000..9a121f8
--- /dev/null
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.elasticsearch;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.IndicesAdminClient;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.index.IndexNotFoundException;
+
+/** Test utilities to use with {@link ElasticsearchIO}. */
+class ElasticSearchIOTestUtils {
+
+ /** Enumeration that specifies whether to insert malformed documents. */
+ enum InjectionMode {
+ INJECT_SOME_INVALID_DOCS,
+ DO_NOT_INJECT_INVALID_DOCS;
+ }
+
+ /** Deletes the given index synchronously. */
+ static void deleteIndex(String index, Client client) throws Exception {
+ IndicesAdminClient indices = client.admin().indices();
+ IndicesExistsResponse indicesExistsResponse =
+ indices.exists(new IndicesExistsRequest(index)).get();
+ if (indicesExistsResponse.isExists()) {
+ indices.prepareClose(index).get();
+ indices.delete(Requests.deleteIndexRequest(index)).get();
+ }
+ }
+
+ /** Inserts the given number of test documents into Elasticsearch. */
+ static void insertTestDocuments(String index, String type, long numDocs, Client client)
+ throws Exception {
+ final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setRefresh(true);
+ List<String> data =
+ ElasticSearchIOTestUtils.createDocuments(
+ numDocs, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+ for (String document : data) {
+ bulkRequestBuilder.add(client.prepareIndex(index, type, null).setSource(document));
+ }
+ final BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
+ if (bulkResponse.hasFailures()) {
+ throw new IOException(
+ String.format(
+ "Cannot insert test documents in index %s : %s",
+ index, bulkResponse.buildFailureMessage()));
+ }
+ }
+
+ /**
+ * Forces an upgrade of the given index to make recently inserted documents available for search.
+ *
+ * @return The number of docs in the index
+ */
+ static long upgradeIndexAndGetCurrentNumDocs(String index, String type, Client client) {
+ try {
+ client.admin().indices().upgrade(new UpgradeRequest(index)).actionGet();
+ SearchResponse response =
+ client.prepareSearch(index).setTypes(type).execute().actionGet(5000);
+ return response.getHits().getTotalHits();
+ // it is fine to ignore bellow exceptions because in testWriteWithBatchSize* sometimes,
+ // we call upgrade before any doc have been written
+ // (when there are fewer docs processed than batchSize).
+ // In that cases index/type has not been created (created upon first doc insertion)
+ } catch (IndexNotFoundException e) {
+ } catch (java.lang.IllegalArgumentException e) {
+ if (!e.getMessage().contains("No search type")) {
+ throw e;
+ }
+ }
+ return 0;
+ }
+
+ /**
+ * Generates a list of test documents for insertion.
+ *
+ * @param numDocs Number of docs to generate
+ * @param injectionMode {@link InjectionMode} that specifies whether to insert malformed documents
+ * @return the list of json String representing the documents
+ */
+ static List<String> createDocuments(long numDocs, InjectionMode injectionMode) {
+ String[] scientists = {
+ "Einstein",
+ "Darwin",
+ "Copernicus",
+ "Pasteur",
+ "Curie",
+ "Faraday",
+ "Newton",
+ "Bohr",
+ "Galilei",
+ "Maxwell"
+ };
+ ArrayList<String> data = new ArrayList<>();
+ for (int i = 0; i < numDocs; i++) {
+ int index = i % scientists.length;
+ // insert 2 malformed documents
+ if (InjectionMode.INJECT_SOME_INVALID_DOCS.equals(injectionMode) && (i == 6 || i == 7)) {
+ data.add(String.format("{\"scientist\";\"%s\", \"id\":%d}", scientists[index], i));
+ } else {
+ data.add(String.format("{\"scientist\":\"%s\", \"id\":%d}", scientists[index], i));
+ }
+ }
+ return data;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/6412389a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
new file mode 100644
index 0000000..8b4cb13
--- /dev/null
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.elasticsearch;
+
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
+import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.core.Is.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.ServerSocket;
+import java.util.List;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.values.PCollection;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+import org.hamcrest.CustomMatcher;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Tests for {@link ElasticsearchIO}. */
+@RunWith(JUnit4.class)
+public class ElasticsearchIOTest implements Serializable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchIOTest.class);
+
+ private static final String ES_INDEX = "beam";
+ private static final String ES_TYPE = "test";
+ private static final String ES_IP = "127.0.0.1";
+ private static final long NUM_DOCS = 400L;
+ private static final int NUM_SCIENTISTS = 10;
+ private static final long BATCH_SIZE = 200L;
+ private static final long AVERAGE_DOC_SIZE = 25L;
+ private static final long BATCH_SIZE_BYTES = 2048L;
+
+ private static Node node;
+ private static ElasticsearchIO.ConnectionConfiguration connectionConfiguration;
+
+ @ClassRule public static TemporaryFolder folder = new TemporaryFolder();
+ @Rule
+ public TestPipeline pipeline = TestPipeline.create();
+
+ @BeforeClass
+ public static void beforeClass() throws IOException {
+ ServerSocket serverSocket = new ServerSocket(0);
+ int esHttpPort = serverSocket.getLocalPort();
+ serverSocket.close();
+ connectionConfiguration =
+ ElasticsearchIO.ConnectionConfiguration.create(
+ new String[] {"http://" + ES_IP + ":" + esHttpPort}, ES_INDEX, ES_TYPE);
+ LOGGER.info("Starting embedded Elasticsearch instance ({})", esHttpPort);
+ Settings.Builder settingsBuilder =
+ Settings.settingsBuilder()
+ .put("cluster.name", "beam")
+ .put("http.enabled", "true")
+ .put("node.data", "true")
+ .put("path.data", folder.getRoot().getPath())
+ .put("path.home", folder.getRoot().getPath())
+ .put("node.name", "beam")
+ .put("network.host", ES_IP)
+ .put("http.port", esHttpPort)
+ .put("index.store.stats_refresh_interval", 0)
+ // had problems with some jdk, embedded ES was too slow for bulk insertion,
+ // and queue of 50 was full. No pb with real ES instance (cf testWrite integration test)
+ .put("threadpool.bulk.queue_size", 100);
+ node = NodeBuilder.nodeBuilder().settings(settingsBuilder).build();
+ LOGGER.info("Elasticsearch node created");
+ node.start();
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ node.close();
+ }
+
+ @Before
+ public void before() throws Exception {
+ ElasticSearchIOTestUtils.deleteIndex(ES_INDEX, node.client());
+ }
+
+ @Test
+ public void testSizes() throws Exception {
+ ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
+ PipelineOptions options = PipelineOptionsFactory.create();
+ ElasticsearchIO.Read read =
+ ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
+ BoundedElasticsearchSource initialSource = new BoundedElasticsearchSource(read, null);
+ // can't use equal assert as Elasticsearch indexes never have same size
+ // (due to internal Elasticsearch implementation)
+ long estimatedSize = initialSource.getEstimatedSizeBytes(options);
+ LOGGER.info("Estimated size: {}", estimatedSize);
+ assertThat("Wrong estimated size", estimatedSize, greaterThan(AVERAGE_DOC_SIZE * NUM_DOCS));
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testRead() throws Exception {
+ ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
+
+ PCollection<String> output =
+ pipeline.apply(
+ ElasticsearchIO.read()
+ .withConnectionConfiguration(connectionConfiguration)
+ //set to default value, useful just to test parameter passing.
+ .withScrollKeepalive("5m")
+ //set to default value, useful just to test parameter passing.
+ .withBatchSize(100L));
+ PAssert.thatSingleton(output.apply("Count", Count.<String>globally())).isEqualTo(NUM_DOCS);
+ pipeline.run();
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testReadWithQuery() throws Exception {
+ ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
+
+ String query =
+ "{\n"
+ + " \"query\": {\n"
+ + " \"match\" : {\n"
+ + " \"scientist\" : {\n"
+ + " \"query\" : \"Einstein\",\n"
+ + " \"type\" : \"boolean\"\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + "}";
+
+ PCollection<String> output =
+ pipeline.apply(
+ ElasticsearchIO.read()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withQuery(query));
+ PAssert.thatSingleton(output.apply("Count", Count.<String>globally()))
+ .isEqualTo(NUM_DOCS / NUM_SCIENTISTS);
+ pipeline.run();
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testWrite() throws Exception {
+ List<String> data =
+ ElasticSearchIOTestUtils.createDocuments(
+ NUM_DOCS, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+ pipeline
+ .apply(Create.of(data))
+ .apply(ElasticsearchIO.write().withConnectionConfiguration(connectionConfiguration));
+ pipeline.run();
+
+ long currentNumDocs =
+ ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, node.client());
+ assertEquals(NUM_DOCS, currentNumDocs);
+
+ QueryBuilder queryBuilder = QueryBuilders.queryStringQuery("Einstein").field("scientist");
+ SearchResponse searchResponse =
+ node.client()
+ .prepareSearch(ES_INDEX)
+ .setTypes(ES_TYPE)
+ .setQuery(queryBuilder)
+ .execute()
+ .actionGet();
+ assertEquals(NUM_DOCS / NUM_SCIENTISTS, searchResponse.getHits().getTotalHits());
+ }
+
+ @Rule public ExpectedException exception = ExpectedException.none();
+
+ @Test
+ public void testWriteWithErrors() throws Exception {
+ ElasticsearchIO.Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withMaxBatchSize(BATCH_SIZE);
+ // write bundles size is the runner decision, we cannot force a bundle size,
+ // so we test the Writer as a DoFn outside of a runner.
+ DoFnTester<String, Void> fnTester = DoFnTester.of(new ElasticsearchIO.Write.WriteFn(write));
+
+ List<String> input =
+ ElasticSearchIOTestUtils.createDocuments(
+ NUM_DOCS, ElasticSearchIOTestUtils.InjectionMode.INJECT_SOME_INVALID_DOCS);
+ exception.expect(isA(IOException.class));
+ exception.expectMessage(
+ new CustomMatcher<String>("RegExp matcher") {
+ @Override
+ public boolean matches(Object o) {
+ String message = (String) o;
+ // This regexp tests that 2 malformed documents are actually in error
+ // and that the message contains their IDs.
+ // It also ensures that root reason, root error type,
+ // caused by reason and caused by error type are present in message.
+ // To avoid flakiness of the test in case of Elasticsearch error message change,
+ // only "failed to parse" root reason is matched,
+ // the other messages are matched using .+
+ return message.matches(
+ "(?is).*Error writing to Elasticsearch, some elements could not be inserted"
+ + ".*Document id .+: failed to parse \\(.+\\).*Caused by: .+ \\(.+\\).*"
+ + "Document id .+: failed to parse \\(.+\\).*Caused by: .+ \\(.+\\).*");
+ }
+ });
+ // inserts into Elasticsearch
+ fnTester.processBundle(input);
+ }
+
+ @Test
+ public void testWriteWithMaxBatchSize() throws Exception {
+ ElasticsearchIO.Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withMaxBatchSize(BATCH_SIZE);
+ // write bundles size is the runner decision, we cannot force a bundle size,
+ // so we test the Writer as a DoFn outside of a runner.
+ DoFnTester<String, Void> fnTester = DoFnTester.of(new ElasticsearchIO.Write.WriteFn(write));
+ List<String> input =
+ ElasticSearchIOTestUtils.createDocuments(
+ NUM_DOCS, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+ long numDocsProcessed = 0;
+ long numDocsInserted = 0;
+ for (String document : input) {
+ fnTester.processElement(document);
+ numDocsProcessed++;
+ // test every 100 docs to avoid overloading ES
+ if ((numDocsProcessed % 100) == 0) {
+ // force the index to upgrade after inserting for the inserted docs
+ // to be searchable immediately
+ long currentNumDocs =
+ ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(
+ ES_INDEX, ES_TYPE, node.client());
+ if ((numDocsProcessed % BATCH_SIZE) == 0) {
+ /* bundle end */
+ assertEquals(
+ "we are at the end of a bundle, we should have inserted all processed documents",
+ numDocsProcessed,
+ currentNumDocs);
+ numDocsInserted = currentNumDocs;
+ } else {
+ /* not bundle end */
+ assertEquals(
+ "we are not at the end of a bundle, we should have inserted no more documents",
+ numDocsInserted,
+ currentNumDocs);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testWriteWithMaxBatchSizeBytes() throws Exception {
+ ElasticsearchIO.Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withMaxBatchSizeBytes(BATCH_SIZE_BYTES);
+ // write bundles size is the runner decision, we cannot force a bundle size,
+ // so we test the Writer as a DoFn outside of a runner.
+ DoFnTester<String, Void> fnTester = DoFnTester.of(new ElasticsearchIO.Write.WriteFn(write));
+ List<String> input =
+ ElasticSearchIOTestUtils.createDocuments(
+ NUM_DOCS, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+ long numDocsProcessed = 0;
+ long sizeProcessed = 0;
+ long numDocsInserted = 0;
+ long batchInserted = 0;
+ for (String document : input) {
+ fnTester.processElement(document);
+ numDocsProcessed++;
+ sizeProcessed += document.getBytes().length;
+ // test every 40 docs to avoid overloading ES
+ if ((numDocsProcessed % 40) == 0) {
+ // force the index to upgrade after inserting for the inserted docs
+ // to be searchable immediately
+ long currentNumDocs =
+ ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(
+ ES_INDEX, ES_TYPE, node.client());
+ if (sizeProcessed / BATCH_SIZE_BYTES > batchInserted) {
+ /* bundle end */
+ assertThat(
+ "we have passed a bundle size, we should have inserted some documents",
+ currentNumDocs,
+ greaterThan(numDocsInserted));
+ numDocsInserted = currentNumDocs;
+ batchInserted = (sizeProcessed / BATCH_SIZE_BYTES);
+ } else {
+ /* not bundle end */
+ assertEquals(
+ "we are not at the end of a bundle, we should have inserted no more documents",
+ numDocsInserted,
+ currentNumDocs);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSplitIntoBundles() throws Exception {
+ ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
+ PipelineOptions options = PipelineOptionsFactory.create();
+ ElasticsearchIO.Read read =
+ ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
+ BoundedElasticsearchSource initialSource = new BoundedElasticsearchSource(read, null);
+ //desiredBundleSize is ignored because in ES 2.x there is no way to split shards. So we get
+ // as many bundles as ES shards and bundle size is shard size
+ int desiredBundleSizeBytes = 0;
+ List<? extends BoundedSource<String>> splits =
+ initialSource.splitIntoBundles(desiredBundleSizeBytes, options);
+ SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
+ //this is the number of ES shards
+ // (By default, each index in Elasticsearch is allocated 5 primary shards)
+ int expectedNumSplits = 5;
+ assertEquals(expectedNumSplits, splits.size());
+ int nonEmptySplits = 0;
+ for (BoundedSource<String> subSource : splits) {
+ if (readFromSource(subSource, options).size() > 0) {
+ nonEmptySplits += 1;
+ }
+ }
+ assertEquals("Wrong number of empty splits", expectedNumSplits, nonEmptySplits);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/6412389a/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 75c4f65..ffe3c02 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -40,6 +40,7 @@
<module>kinesis</module>
<module>mongodb</module>
<module>jdbc</module>
+ <module>elasticsearch</module>
<module>mqtt</module>
</modules>
[2/2] beam git commit: [BEAM-425] This closes #1439
Posted by jb...@apache.org.
[BEAM-425] This closes #1439
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/926ec8e8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/926ec8e8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/926ec8e8
Branch: refs/heads/master
Commit: 926ec8e8066f2161af878d19b75a1ea3774500a3
Parents: d1d85df 6412389
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Wed Jan 4 13:53:24 2017 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Jan 4 13:53:24 2017 +0100
----------------------------------------------------------------------
sdks/java/io/elasticsearch/pom.xml | 175 ++++
.../sdk/io/elasticsearch/ElasticsearchIO.java | 819 +++++++++++++++++++
.../beam/sdk/io/elasticsearch/package-info.java | 20 +
.../elasticsearch/ElasticSearchIOTestUtils.java | 129 +++
.../io/elasticsearch/ElasticsearchIOTest.java | 358 ++++++++
sdks/java/io/pom.xml | 1 +
6 files changed, 1502 insertions(+)
----------------------------------------------------------------------