You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/28 08:06:21 UTC
[1/2] beam git commit: [BEAM-2541] Check Elasticsearch backend
version when the pipeline is run not when it is constructed
Repository: beam
Updated Branches:
refs/heads/master 68bb7c0f5 -> 6ca53055b
[BEAM-2541] Check Elasticsearch backend version when the pipeline is run not when it is constructed
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f61e797f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f61e797f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f61e797f
Branch: refs/heads/master
Commit: f61e797fbb5cf16942199c47bfff3ad84739700c
Parents: 892a2cf
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Wed Jul 5 11:35:59 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Thu Jul 27 23:36:01 2017 +0200
----------------------------------------------------------------------
.../sdk/io/elasticsearch/ElasticsearchIO.java | 40 ++++++++++----------
.../elasticsearch/ElasticsearchTestDataSet.java | 11 ++----
2 files changed, 23 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f61e797f/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 4d76887..8e6e253 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -183,10 +183,8 @@ public class ElasticsearchIO {
* @param index the index toward which the requests will be issued
* @param type the document type toward which the requests will be issued
* @return the connection configuration object
- * @throws IOException when it fails to connect to Elasticsearch
*/
- public static ConnectionConfiguration create(String[] addresses, String index, String type)
- throws IOException {
+ public static ConnectionConfiguration create(String[] addresses, String index, String type){
checkArgument(
addresses != null,
"ConnectionConfiguration.create(addresses, index, type) called with null address");
@@ -206,25 +204,9 @@ public class ElasticsearchIO {
.setIndex(index)
.setType(type)
.build();
- checkVersion(connectionConfiguration);
return connectionConfiguration;
}
- private static void checkVersion(ConnectionConfiguration connectionConfiguration)
- throws IOException {
- RestClient restClient = connectionConfiguration.createClient();
- Response response = restClient.performRequest("GET", "", new BasicHeader("", ""));
- JsonNode jsonNode = parseResponse(response);
- String version = jsonNode.path("version").path("number").asText();
- boolean version2x = version.startsWith("2.");
- restClient.close();
- checkArgument(
- version2x,
- "ConnectionConfiguration.create(addresses, index, type): "
- + "the Elasticsearch version to connect to is different of 2.x. "
- + "This version of the ElasticsearchIO is only compatible with Elasticsearch v2.x");
- }
-
/**
* If Elasticsearch authentication is enabled, provide the username.
*
@@ -398,10 +380,12 @@ public class ElasticsearchIO {
@Override
public void validate(PipelineOptions options) {
+ ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
checkState(
- getConnectionConfiguration() != null,
+ connectionConfiguration != null,
"ElasticsearchIO.read() requires a connection configuration"
+ " to be set via withConnectionConfiguration(configuration)");
+ checkVersion(connectionConfiguration);
}
@Override
@@ -715,10 +699,12 @@ public class ElasticsearchIO {
@Override
public void validate(PipelineOptions options) {
+ ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
checkState(
- getConnectionConfiguration() != null,
+ connectionConfiguration != null,
"ElasticsearchIO.write() requires a connection configuration"
+ " to be set via withConnectionConfiguration(configuration)");
+ checkVersion(connectionConfiguration);
}
@Override
@@ -828,4 +814,16 @@ public class ElasticsearchIO {
}
}
}
+ private static void checkVersion(ConnectionConfiguration connectionConfiguration){
+ try (RestClient restClient = connectionConfiguration.createClient()) {
+ Response response = restClient.performRequest("GET", "", new BasicHeader("", ""));
+ JsonNode jsonNode = parseResponse(response);
+ String version = jsonNode.path("version").path("number").asText();
+ boolean version2x = version.startsWith("2.");
+ checkArgument(version2x, "The Elasticsearch version to connect to is different of 2.x. "
+ + "This version of the ElasticsearchIO is only compatible with Elasticsearch v2.x");
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Cannot check Elasticsearch version");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f61e797f/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
index e2c291b..a6e1cc0 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.io.elasticsearch;
-import java.io.IOException;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.elasticsearch.client.RestClient;
@@ -37,7 +36,7 @@ public class ElasticsearchTestDataSet {
public static final long NUM_DOCS = 60000;
public static final int AVERAGE_DOC_SIZE = 25;
public static final int MAX_DOC_SIZE = 35;
- private static String writeIndex = ES_INDEX + System.currentTimeMillis();
+ private static final String writeIndex = ES_INDEX + System.currentTimeMillis();
/**
* Use this to create the index for reading before IT read tests.
@@ -63,17 +62,15 @@ public class ElasticsearchTestDataSet {
}
private static void createAndPopulateReadIndex(IOTestPipelineOptions options) throws Exception {
- RestClient restClient = getConnectionConfiguration(options, ReadOrWrite.READ).createClient();
// automatically creates the index and insert docs
- try {
+ try (RestClient restClient = getConnectionConfiguration(options, ReadOrWrite.READ)
+ .createClient()) {
ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient);
- } finally {
- restClient.close();
}
}
static ElasticsearchIO.ConnectionConfiguration getConnectionConfiguration(
- IOTestPipelineOptions options, ReadOrWrite rOw) throws IOException {
+ IOTestPipelineOptions options, ReadOrWrite rOw){
ElasticsearchIO.ConnectionConfiguration connectionConfiguration =
ElasticsearchIO.ConnectionConfiguration.create(
new String[] {
[2/2] beam git commit: [BEAM-2541] This closes #3493
Posted by jb...@apache.org.
[BEAM-2541] This closes #3493
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6ca53055
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6ca53055
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6ca53055
Branch: refs/heads/master
Commit: 6ca53055be7049ecb9c93d8ff89652a1880f0c9b
Parents: 68bb7c0 f61e797
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Fri Jul 28 10:06:14 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Fri Jul 28 10:06:14 2017 +0200
----------------------------------------------------------------------
.../sdk/io/elasticsearch/ElasticsearchIO.java | 40 ++++++++++----------
.../elasticsearch/ElasticsearchTestDataSet.java | 11 ++----
2 files changed, 23 insertions(+), 28 deletions(-)
----------------------------------------------------------------------