You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/28 08:06:21 UTC

[1/2] beam git commit: [BEAM-2541] Check Elasticsearch backend version when the pipeline is run not when it is constructed

Repository: beam
Updated Branches:
  refs/heads/master 68bb7c0f5 -> 6ca53055b


[BEAM-2541] Check Elasticsearch backend version when the pipeline is run not when it is constructed


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f61e797f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f61e797f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f61e797f

Branch: refs/heads/master
Commit: f61e797fbb5cf16942199c47bfff3ad84739700c
Parents: 892a2cf
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Wed Jul 5 11:35:59 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Thu Jul 27 23:36:01 2017 +0200

----------------------------------------------------------------------
 .../sdk/io/elasticsearch/ElasticsearchIO.java   | 40 ++++++++++----------
 .../elasticsearch/ElasticsearchTestDataSet.java | 11 ++----
 2 files changed, 23 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f61e797f/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 4d76887..8e6e253 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -183,10 +183,8 @@ public class ElasticsearchIO {
      * @param index the index toward which the requests will be issued
      * @param type the document type toward which the requests will be issued
      * @return the connection configuration object
-     * @throws IOException when it fails to connect to Elasticsearch
      */
-    public static ConnectionConfiguration create(String[] addresses, String index, String type)
-        throws IOException {
+    public static ConnectionConfiguration create(String[] addresses, String index, String type){
       checkArgument(
           addresses != null,
           "ConnectionConfiguration.create(addresses, index, type) called with null address");
@@ -206,25 +204,9 @@ public class ElasticsearchIO {
               .setIndex(index)
               .setType(type)
               .build();
-      checkVersion(connectionConfiguration);
       return connectionConfiguration;
     }
 
-    private static void checkVersion(ConnectionConfiguration connectionConfiguration)
-        throws IOException {
-      RestClient restClient = connectionConfiguration.createClient();
-      Response response = restClient.performRequest("GET", "", new BasicHeader("", ""));
-      JsonNode jsonNode = parseResponse(response);
-      String version = jsonNode.path("version").path("number").asText();
-      boolean version2x = version.startsWith("2.");
-      restClient.close();
-      checkArgument(
-          version2x,
-          "ConnectionConfiguration.create(addresses, index, type): "
-              + "the Elasticsearch version to connect to is different of 2.x. "
-              + "This version of the ElasticsearchIO is only compatible with Elasticsearch v2.x");
-    }
-
     /**
      * If Elasticsearch authentication is enabled, provide the username.
      *
@@ -398,10 +380,12 @@ public class ElasticsearchIO {
 
     @Override
     public void validate(PipelineOptions options) {
+      ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
       checkState(
-          getConnectionConfiguration() != null,
+          connectionConfiguration != null,
           "ElasticsearchIO.read() requires a connection configuration"
               + " to be set via withConnectionConfiguration(configuration)");
+      checkVersion(connectionConfiguration);
     }
 
     @Override
@@ -715,10 +699,12 @@ public class ElasticsearchIO {
 
     @Override
     public void validate(PipelineOptions options) {
+      ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
       checkState(
-          getConnectionConfiguration() != null,
+          connectionConfiguration != null,
           "ElasticsearchIO.write() requires a connection configuration"
               + " to be set via withConnectionConfiguration(configuration)");
+      checkVersion(connectionConfiguration);
     }
 
     @Override
@@ -828,4 +814,16 @@ public class ElasticsearchIO {
       }
     }
   }
+  private static void checkVersion(ConnectionConfiguration connectionConfiguration){
+    try (RestClient restClient = connectionConfiguration.createClient()) {
+      Response response = restClient.performRequest("GET", "", new BasicHeader("", ""));
+      JsonNode jsonNode = parseResponse(response);
+      String version = jsonNode.path("version").path("number").asText();
+      boolean version2x = version.startsWith("2.");
+      checkArgument(version2x, "The Elasticsearch version to connect to is different of 2.x. "
+          + "This version of the ElasticsearchIO is only compatible with Elasticsearch v2.x");
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Cannot check Elasticsearch version");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f61e797f/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
index e2c291b..a6e1cc0 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.io.elasticsearch;
 
 
-import java.io.IOException;
 import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.elasticsearch.client.RestClient;
@@ -37,7 +36,7 @@ public class ElasticsearchTestDataSet {
   public static final long NUM_DOCS = 60000;
   public static final int AVERAGE_DOC_SIZE = 25;
   public static final int MAX_DOC_SIZE = 35;
-  private static String writeIndex = ES_INDEX + System.currentTimeMillis();
+  private static final String writeIndex = ES_INDEX + System.currentTimeMillis();
 
   /**
    * Use this to create the index for reading before IT read tests.
@@ -63,17 +62,15 @@ public class ElasticsearchTestDataSet {
   }
 
   private static void createAndPopulateReadIndex(IOTestPipelineOptions options) throws Exception {
-    RestClient restClient =  getConnectionConfiguration(options, ReadOrWrite.READ).createClient();
     // automatically creates the index and insert docs
-    try {
+    try (RestClient restClient = getConnectionConfiguration(options, ReadOrWrite.READ)
+        .createClient()) {
       ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient);
-    } finally {
-      restClient.close();
     }
   }
 
   static ElasticsearchIO.ConnectionConfiguration getConnectionConfiguration(
-      IOTestPipelineOptions options, ReadOrWrite rOw) throws IOException {
+      IOTestPipelineOptions options, ReadOrWrite rOw){
     ElasticsearchIO.ConnectionConfiguration connectionConfiguration =
         ElasticsearchIO.ConnectionConfiguration.create(
             new String[] {


[2/2] beam git commit: [BEAM-2541] This closes #3493

Posted by jb...@apache.org.
[BEAM-2541] This closes #3493


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6ca53055
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6ca53055
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6ca53055

Branch: refs/heads/master
Commit: 6ca53055be7049ecb9c93d8ff89652a1880f0c9b
Parents: 68bb7c0 f61e797
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Fri Jul 28 10:06:14 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Fri Jul 28 10:06:14 2017 +0200

----------------------------------------------------------------------
 .../sdk/io/elasticsearch/ElasticsearchIO.java   | 40 ++++++++++----------
 .../elasticsearch/ElasticsearchTestDataSet.java | 11 ++----
 2 files changed, 23 insertions(+), 28 deletions(-)
----------------------------------------------------------------------