You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/03/07 20:44:27 UTC

[1/2] beam git commit: Change Json parsing from gson to jackson for ElasticsearchIO

Repository: beam
Updated Branches:
  refs/heads/master b74e18654 -> b45381dad


Change Json parsing from gson to jackson for ElasticsearchIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0694344e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0694344e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0694344e

Branch: refs/heads/master
Commit: 0694344ee03520bae24a2b484a28927579e2bb7e
Parents: b74e186
Author: Isma�l Mej�a <ie...@gmail.com>
Authored: Mon Mar 6 09:13:31 2017 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Tue Mar 7 21:28:30 2017 +0100

----------------------------------------------------------------------
 sdks/java/io/elasticsearch/pom.xml              |   7 +-
 .../sdk/io/elasticsearch/ElasticsearchIO.java   | 102 +++++++++----------
 2 files changed, 52 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0694344e/sdks/java/io/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/pom.xml b/sdks/java/io/elasticsearch/pom.xml
index 91a7ae6..eecfe6b 100644
--- a/sdks/java/io/elasticsearch/pom.xml
+++ b/sdks/java/io/elasticsearch/pom.xml
@@ -47,9 +47,8 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.code.gson</groupId>
-      <artifactId>gson</artifactId>
-      <version>2.6.2</version>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
     </dependency>
 
     <dependency>
@@ -116,7 +115,7 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.commons</groupId>
+      <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
       <version>1.3.2</version>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/beam/blob/0694344e/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 5073834..b08cb24 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -20,17 +20,13 @@ package org.apache.beam.sdk.io.elasticsearch;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
-import com.google.gson.Gson;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.Serializable;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -38,11 +34,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Set;
 import javax.annotation.Nullable;
 
 import org.apache.beam.sdk.coders.Coder;
@@ -140,11 +136,10 @@ public class ElasticsearchIO {
 
   private ElasticsearchIO() {}
 
-  private static JsonObject parseResponse(Response response) throws IOException {
-    InputStream content = response.getEntity().getContent();
-    InputStreamReader inputStreamReader = new InputStreamReader(content, "UTF-8");
-    JsonObject jsonObject = new Gson().fromJson(inputStreamReader, JsonObject.class);
-    return jsonObject;
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  private static JsonNode parseResponse(Response response) throws IOException {
+    return mapper.readValue(response.getEntity().getContent(), JsonNode.class);
   }
 
   /** A POJO describing a connection configuration to Elasticsearch. */
@@ -428,23 +423,24 @@ public class ElasticsearchIO {
       // But, as each shard (replica or primary) is responsible for only one part of the data,
       // there will be no duplicate.
 
-      JsonObject statsJson = getStats(true);
-      JsonObject shardsJson =
+      JsonNode statsJson = getStats(true);
+      JsonNode shardsJson =
           statsJson
-              .getAsJsonObject("indices")
-              .getAsJsonObject(spec.getConnectionConfiguration().getIndex())
-              .getAsJsonObject("shards");
-      Set<Map.Entry<String, JsonElement>> shards = shardsJson.entrySet();
-      for (Map.Entry<String, JsonElement> shardJson : shards) {
+              .path("indices")
+              .path(spec.getConnectionConfiguration().getIndex())
+              .path("shards");
+
+      Iterator<Map.Entry<String, JsonNode>> shards = shardsJson.fields();
+      while (shards.hasNext()) {
+        Map.Entry<String, JsonNode> shardJson = shards.next();
         String shardId = shardJson.getKey();
-        JsonArray value = (JsonArray) shardJson.getValue();
+        JsonNode value = (JsonNode) shardJson.getValue();
         boolean isPrimaryShard =
             value
-                .get(0)
-                .getAsJsonObject()
-                .getAsJsonObject("routing")
-                .getAsJsonPrimitive("primary")
-                .getAsBoolean();
+                .path(0)
+                .path("routing")
+                .path("primary")
+                .asBoolean();
         if (isPrimaryShard) {
           sources.add(new BoundedElasticsearchSource(spec, shardId));
         }
@@ -463,14 +459,14 @@ public class ElasticsearchIO {
       // NB: Elasticsearch 5.x now provides the slice API.
       // (https://www.elastic.co/guide/en/elasticsearch/reference/5.0/search-request-scroll.html
       // #sliced-scroll)
-      JsonObject statsJson = getStats(false);
-      JsonObject indexStats =
+      JsonNode statsJson = getStats(false);
+      JsonNode indexStats =
           statsJson
-              .getAsJsonObject("indices")
-              .getAsJsonObject(spec.getConnectionConfiguration().getIndex())
-              .getAsJsonObject("primaries");
-      JsonObject store = indexStats.getAsJsonObject("store");
-      return store.getAsJsonPrimitive("size_in_bytes").getAsLong();
+              .path("indices")
+              .path(spec.getConnectionConfiguration().getIndex())
+              .path("primaries");
+      JsonNode store = indexStats.path("store");
+      return store.path("size_in_bytes").asLong();
     }
 
     @Override
@@ -494,7 +490,7 @@ public class ElasticsearchIO {
       return StringUtf8Coder.of();
     }
 
-    private JsonObject getStats(boolean shardLevel) throws IOException {
+    private JsonNode getStats(boolean shardLevel) throws IOException {
       HashMap<String, String> params = new HashMap<>();
       if (shardLevel) {
         params.put("level", "shards");
@@ -544,13 +540,13 @@ public class ElasticsearchIO {
       HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON);
       response =
           restClient.performRequest("GET", endPoint, params, queryEntity, new BasicHeader("", ""));
-      JsonObject searchResult = parseResponse(response);
+      JsonNode searchResult = parseResponse(response);
       updateScrollId(searchResult);
       return readNextBatchAndReturnFirstDocument(searchResult);
     }
 
-    private void updateScrollId(JsonObject searchResult) {
-      scrollId = searchResult.getAsJsonPrimitive("_scroll_id").getAsString();
+    private void updateScrollId(JsonNode searchResult) {
+      scrollId = searchResult.path("_scroll_id").asText();
     }
 
     @Override
@@ -571,15 +567,15 @@ public class ElasticsearchIO {
                 Collections.<String, String>emptyMap(),
                 scrollEntity,
                 new BasicHeader("", ""));
-        JsonObject searchResult = parseResponse(response);
+        JsonNode searchResult = parseResponse(response);
         updateScrollId(searchResult);
         return readNextBatchAndReturnFirstDocument(searchResult);
       }
     }
 
-    private boolean readNextBatchAndReturnFirstDocument(JsonObject searchResult) {
+    private boolean readNextBatchAndReturnFirstDocument(JsonNode searchResult) {
       //stop if no more data
-      JsonArray hits = searchResult.getAsJsonObject("hits").getAsJsonArray("hits");
+      JsonNode hits = searchResult.path("hits").path("hits");
       if (hits.size() == 0) {
         current = null;
         batchIterator = null;
@@ -587,8 +583,8 @@ public class ElasticsearchIO {
       }
       // list behind iterator is empty
       List<String> batch = new ArrayList<>();
-      for (JsonElement hit : hits) {
-        String document = hit.getAsJsonObject().getAsJsonObject("_source").toString();
+      for (JsonNode hit : hits) {
+        String document = hit.path("_source").toString();
         batch.add(document);
       }
       batchIterator = batch.listIterator();
@@ -780,26 +776,26 @@ public class ElasticsearchIO {
                 Collections.<String, String>emptyMap(),
                 requestBody,
                 new BasicHeader("", ""));
-        JsonObject searchResult = parseResponse(response);
-        boolean errors = searchResult.getAsJsonPrimitive("errors").getAsBoolean();
+        JsonNode searchResult = parseResponse(response);
+        boolean errors = searchResult.path("errors").asBoolean();
         if (errors) {
           StringBuilder errorMessages =
               new StringBuilder(
                   "Error writing to Elasticsearch, some elements could not be inserted:");
-          JsonArray items = searchResult.getAsJsonArray("items");
+          JsonNode items = searchResult.path("items");
           //some items present in bulk might have errors, concatenate error messages
-          for (JsonElement item : items) {
-            JsonObject creationObject = item.getAsJsonObject().getAsJsonObject("create");
-            JsonObject error = creationObject.getAsJsonObject("error");
+          for (JsonNode item : items) {
+            JsonNode creationObject = item.path("create");
+            JsonNode error = creationObject.get("error");
             if (error != null) {
-              String type = error.getAsJsonPrimitive("type").getAsString();
-              String reason = error.getAsJsonPrimitive("reason").getAsString();
-              String docId = creationObject.getAsJsonPrimitive("_id").getAsString();
+              String type = error.path("type").asText();
+              String reason = error.path("reason").asText();
+              String docId = creationObject.path("_id").asText();
               errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type));
-              JsonObject causedBy = error.getAsJsonObject("caused_by");
+              JsonNode causedBy = error.get("caused_by");
               if (causedBy != null) {
-                String cbReason = causedBy.getAsJsonPrimitive("reason").getAsString();
-                String cbType = causedBy.getAsJsonPrimitive("type").getAsString();
+                String cbReason = causedBy.path("reason").asText();
+                String cbType = causedBy.path("type").asText();
                 errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType));
               }
             }


[2/2] beam git commit: This closes #2167

Posted by jb...@apache.org.
This closes #2167


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b45381da
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b45381da
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b45381da

Branch: refs/heads/master
Commit: b45381dad4f8f94f4218db114089b97c740a968e
Parents: b74e186 0694344
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Tue Mar 7 21:43:56 2017 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Tue Mar 7 21:43:56 2017 +0100

----------------------------------------------------------------------
 sdks/java/io/elasticsearch/pom.xml              |   7 +-
 .../sdk/io/elasticsearch/ElasticsearchIO.java   | 102 +++++++++----------
 2 files changed, 52 insertions(+), 57 deletions(-)
----------------------------------------------------------------------