You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/03/17 14:52:05 UTC
[1/2] beam git commit: Add Elasticsearch 2.x version check in the
ElasticsearchIO
Repository: beam
Updated Branches:
refs/heads/master 25b52c5ac -> 969acb7da
Add Elasticsearch 2.x version check in the ElasticsearchIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2395d188
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2395d188
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2395d188
Branch: refs/heads/master
Commit: 2395d1880a4dfe3c2f8ff4cfdfb95b948a65aae0
Parents: 25b52c5
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Mon Mar 13 11:45:21 2017 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Fri Mar 17 22:34:11 2017 +0800
----------------------------------------------------------------------
.../sdk/io/elasticsearch/ElasticsearchIO.java | 33 ++++++++++++++++----
.../io/elasticsearch/ElasticsearchIOTest.java | 6 ++--
.../elasticsearch/ElasticsearchTestDataSet.java | 7 +++--
3 files changed, 34 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2395d188/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index b08cb24..54692b9 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -68,6 +68,7 @@ import org.elasticsearch.client.RestClientBuilder;
/**
* Transforms for reading and writing data from/to Elasticsearch.
+ * This IO is only compatible with Elasticsearch v2.x
*
* <h3>Reading from Elasticsearch</h3>
*
@@ -182,8 +183,10 @@ public class ElasticsearchIO {
* @param index the index toward which the requests will be issued
* @param type the document type toward which the requests will be issued
* @return the connection configuration object
+ * @throws IOException when it fails to connect to Elasticsearch
*/
- public static ConnectionConfiguration create(String[] addresses, String index, String type) {
+ public static ConnectionConfiguration create(String[] addresses, String index, String type)
+ throws IOException {
checkArgument(
addresses != null,
"ConnectionConfiguration.create(addresses, index, type) called with null address");
@@ -197,11 +200,29 @@ public class ElasticsearchIO {
checkArgument(
type != null,
"ConnectionConfiguration.create(addresses, index, type) called with null type");
- return new AutoValue_ElasticsearchIO_ConnectionConfiguration.Builder()
- .setAddresses(Arrays.asList(addresses))
- .setIndex(index)
- .setType(type)
- .build();
+ ConnectionConfiguration connectionConfiguration =
+ new AutoValue_ElasticsearchIO_ConnectionConfiguration.Builder()
+ .setAddresses(Arrays.asList(addresses))
+ .setIndex(index)
+ .setType(type)
+ .build();
+ checkVersion(connectionConfiguration);
+ return connectionConfiguration;
+ }
+
+ private static void checkVersion(ConnectionConfiguration connectionConfiguration)
+ throws IOException {
+ RestClient restClient = connectionConfiguration.createClient();
+ Response response = restClient.performRequest("GET", "", new BasicHeader("", ""));
+ JsonNode jsonNode = parseResponse(response);
+ String version = jsonNode.path("version").path("number").asText();
+ boolean version2x = version.startsWith("2.");
+ restClient.close();
+ checkArgument(
+ version2x,
+ "ConnectionConfiguration.create(addresses, index, type): "
+ + "the Elasticsearch version to connect to is different of 2.x. "
+ + "This version of the ElasticsearchIO is only compatible with Elasticsearch v2.x");
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/2395d188/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index b41d698..60054c6 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -87,9 +87,6 @@ public class ElasticsearchIOTest implements Serializable {
ServerSocket serverSocket = new ServerSocket(0);
int esHttpPort = serverSocket.getLocalPort();
serverSocket.close();
- connectionConfiguration =
- ElasticsearchIO.ConnectionConfiguration.create(
- new String[] {"http://" + ES_IP + ":" + esHttpPort}, ES_INDEX, ES_TYPE);
LOG.info("Starting embedded Elasticsearch instance ({})", esHttpPort);
Settings.Builder settingsBuilder =
Settings.settingsBuilder()
@@ -108,6 +105,9 @@ public class ElasticsearchIOTest implements Serializable {
node = NodeBuilder.nodeBuilder().settings(settingsBuilder).build();
LOG.info("Elasticsearch node created");
node.start();
+ connectionConfiguration =
+ ElasticsearchIO.ConnectionConfiguration.create(
+ new String[] {"http://" + ES_IP + ":" + esHttpPort}, ES_INDEX, ES_TYPE);
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/beam/blob/2395d188/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
index 6ce89f1..bd8141f 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.elasticsearch;
import static java.net.InetAddress.getByName;
+import java.io.IOException;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
@@ -78,12 +79,12 @@ public class ElasticsearchTestDataSet {
.addTransportAddress(
new InetSocketTransportAddress(
getByName(options.getElasticsearchServer()),
- Integer.valueOf(options.getElasticsearchTcpPort())));
+ options.getElasticsearchTcpPort()));
return client;
}
public static ElasticsearchIO.ConnectionConfiguration getConnectionConfiguration(
- ElasticsearchTestOptions options, ReadOrWrite rOw) {
+ ElasticsearchTestOptions options, ReadOrWrite rOw) throws IOException {
ElasticsearchIO.ConnectionConfiguration connectionConfiguration =
ElasticsearchIO.ConnectionConfiguration.create(
new String[] {
@@ -95,7 +96,7 @@ public class ElasticsearchTestDataSet {
(rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex,
ES_TYPE);
return connectionConfiguration;
- };
+ }
public static void deleteIndex(TransportClient client, ReadOrWrite rOw) throws Exception {
ElasticSearchIOTestUtils.deleteIndex((rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, client);
[2/2] beam git commit: This closes #2230
Posted by jb...@apache.org.
This closes #2230
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/969acb7d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/969acb7d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/969acb7d
Branch: refs/heads/master
Commit: 969acb7da5e5d7d7a8e8bd6cce4376239b6e63c3
Parents: 25b52c5 2395d18
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Fri Mar 17 22:51:57 2017 +0800
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Fri Mar 17 22:51:57 2017 +0800
----------------------------------------------------------------------
.../sdk/io/elasticsearch/ElasticsearchIO.java | 33 ++++++++++++++++----
.../io/elasticsearch/ElasticsearchIOTest.java | 6 ++--
.../elasticsearch/ElasticsearchTestDataSet.java | 7 +++--
3 files changed, 34 insertions(+), 12 deletions(-)
----------------------------------------------------------------------