You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/03/17 14:52:05 UTC

[1/2] beam git commit: Add Elasticsearch 2.x version check in the ElasticsearchIO

Repository: beam
Updated Branches:
  refs/heads/master 25b52c5ac -> 969acb7da


Add Elasticsearch 2.x version check in the ElasticsearchIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2395d188
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2395d188
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2395d188

Branch: refs/heads/master
Commit: 2395d1880a4dfe3c2f8ff4cfdfb95b948a65aae0
Parents: 25b52c5
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Mon Mar 13 11:45:21 2017 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Fri Mar 17 22:34:11 2017 +0800

----------------------------------------------------------------------
 .../sdk/io/elasticsearch/ElasticsearchIO.java   | 33 ++++++++++++++++----
 .../io/elasticsearch/ElasticsearchIOTest.java   |  6 ++--
 .../elasticsearch/ElasticsearchTestDataSet.java |  7 +++--
 3 files changed, 34 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2395d188/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index b08cb24..54692b9 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -68,6 +68,7 @@ import org.elasticsearch.client.RestClientBuilder;
 
 /**
  * Transforms for reading and writing data from/to Elasticsearch.
+ * This IO is only compatible with Elasticsearch v2.x
  *
  * <h3>Reading from Elasticsearch</h3>
  *
@@ -182,8 +183,10 @@ public class ElasticsearchIO {
      * @param index the index toward which the requests will be issued
      * @param type the document type toward which the requests will be issued
      * @return the connection configuration object
+     * @throws IOException when it fails to connect to Elasticsearch
      */
-    public static ConnectionConfiguration create(String[] addresses, String index, String type) {
+    public static ConnectionConfiguration create(String[] addresses, String index, String type)
+        throws IOException {
       checkArgument(
           addresses != null,
           "ConnectionConfiguration.create(addresses, index, type) called with null address");
@@ -197,11 +200,29 @@ public class ElasticsearchIO {
       checkArgument(
           type != null,
           "ConnectionConfiguration.create(addresses, index, type) called with null type");
-      return new AutoValue_ElasticsearchIO_ConnectionConfiguration.Builder()
-          .setAddresses(Arrays.asList(addresses))
-          .setIndex(index)
-          .setType(type)
-          .build();
+      ConnectionConfiguration connectionConfiguration =
+          new AutoValue_ElasticsearchIO_ConnectionConfiguration.Builder()
+              .setAddresses(Arrays.asList(addresses))
+              .setIndex(index)
+              .setType(type)
+              .build();
+      checkVersion(connectionConfiguration);
+      return connectionConfiguration;
+    }
+
+    private static void checkVersion(ConnectionConfiguration connectionConfiguration)
+        throws IOException {
+      RestClient restClient = connectionConfiguration.createClient();
+      Response response = restClient.performRequest("GET", "", new BasicHeader("", ""));
+      JsonNode jsonNode = parseResponse(response);
+      String version = jsonNode.path("version").path("number").asText();
+      boolean version2x = version.startsWith("2.");
+      restClient.close();
+      checkArgument(
+          version2x,
+          "ConnectionConfiguration.create(addresses, index, type): "
+              + "the Elasticsearch version to connect to is different of 2.x. "
+              + "This version of the ElasticsearchIO is only compatible with Elasticsearch v2.x");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/2395d188/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index b41d698..60054c6 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -87,9 +87,6 @@ public class ElasticsearchIOTest implements Serializable {
     ServerSocket serverSocket = new ServerSocket(0);
     int esHttpPort = serverSocket.getLocalPort();
     serverSocket.close();
-    connectionConfiguration =
-        ElasticsearchIO.ConnectionConfiguration.create(
-            new String[] {"http://" + ES_IP + ":" + esHttpPort}, ES_INDEX, ES_TYPE);
     LOG.info("Starting embedded Elasticsearch instance ({})", esHttpPort);
     Settings.Builder settingsBuilder =
         Settings.settingsBuilder()
@@ -108,6 +105,9 @@ public class ElasticsearchIOTest implements Serializable {
     node = NodeBuilder.nodeBuilder().settings(settingsBuilder).build();
     LOG.info("Elasticsearch node created");
     node.start();
+    connectionConfiguration =
+      ElasticsearchIO.ConnectionConfiguration.create(
+        new String[] {"http://" + ES_IP + ":" + esHttpPort}, ES_INDEX, ES_TYPE);
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/beam/blob/2395d188/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
index 6ce89f1..bd8141f 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.elasticsearch;
 
 import static java.net.InetAddress.getByName;
 
+import java.io.IOException;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
@@ -78,12 +79,12 @@ public class ElasticsearchTestDataSet {
             .addTransportAddress(
                 new InetSocketTransportAddress(
                     getByName(options.getElasticsearchServer()),
-                    Integer.valueOf(options.getElasticsearchTcpPort())));
+                    options.getElasticsearchTcpPort()));
     return client;
   }
 
   public static ElasticsearchIO.ConnectionConfiguration getConnectionConfiguration(
-      ElasticsearchTestOptions options, ReadOrWrite rOw) {
+      ElasticsearchTestOptions options, ReadOrWrite rOw) throws IOException {
     ElasticsearchIO.ConnectionConfiguration connectionConfiguration =
         ElasticsearchIO.ConnectionConfiguration.create(
             new String[] {
@@ -95,7 +96,7 @@ public class ElasticsearchTestDataSet {
             (rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex,
             ES_TYPE);
     return connectionConfiguration;
-  };
+  }
 
   public static void deleteIndex(TransportClient client, ReadOrWrite rOw) throws Exception {
     ElasticSearchIOTestUtils.deleteIndex((rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, client);


[2/2] beam git commit: This closes #2230

Posted by jb...@apache.org.
This closes #2230


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/969acb7d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/969acb7d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/969acb7d

Branch: refs/heads/master
Commit: 969acb7da5e5d7d7a8e8bd6cce4376239b6e63c3
Parents: 25b52c5 2395d18
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Fri Mar 17 22:51:57 2017 +0800
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Fri Mar 17 22:51:57 2017 +0800

----------------------------------------------------------------------
 .../sdk/io/elasticsearch/ElasticsearchIO.java   | 33 ++++++++++++++++----
 .../io/elasticsearch/ElasticsearchIOTest.java   |  6 ++--
 .../elasticsearch/ElasticsearchTestDataSet.java |  7 +++--
 3 files changed, 34 insertions(+), 12 deletions(-)
----------------------------------------------------------------------