You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/03/08 22:25:17 UTC
[2/9] beam git commit: Change Json parsing from gson to jackson for
ElasticsearchIO
Change Json parsing from gson to jackson for ElasticsearchIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d5257658
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d5257658
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d5257658
Branch: refs/heads/release-0.6.0
Commit: d5257658f094fe8c2a8668027bbdd4a26396ba0b
Parents: 8ab36fa
Author: Isma�l Mej�a <ie...@gmail.com>
Authored: Mon Mar 6 09:13:31 2017 +0100
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Mar 8 13:38:39 2017 -0800
----------------------------------------------------------------------
sdks/java/io/elasticsearch/pom.xml | 7 +-
.../sdk/io/elasticsearch/ElasticsearchIO.java | 102 +++++++++----------
2 files changed, 52 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d5257658/sdks/java/io/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/pom.xml b/sdks/java/io/elasticsearch/pom.xml
index 3279dfd..5ea4452 100644
--- a/sdks/java/io/elasticsearch/pom.xml
+++ b/sdks/java/io/elasticsearch/pom.xml
@@ -47,9 +47,8 @@
</dependency>
<dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- <version>2.6.2</version>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
@@ -116,7 +115,7 @@
</dependency>
<dependency>
- <groupId>org.apache.commons</groupId>
+ <groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/beam/blob/d5257658/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 5073834..b08cb24 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -20,17 +20,13 @@ package org.apache.beam.sdk.io.elasticsearch;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
-import com.google.gson.Gson;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URL;
@@ -38,11 +34,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
@@ -140,11 +136,10 @@ public class ElasticsearchIO {
private ElasticsearchIO() {}
- private static JsonObject parseResponse(Response response) throws IOException {
- InputStream content = response.getEntity().getContent();
- InputStreamReader inputStreamReader = new InputStreamReader(content, "UTF-8");
- JsonObject jsonObject = new Gson().fromJson(inputStreamReader, JsonObject.class);
- return jsonObject;
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ private static JsonNode parseResponse(Response response) throws IOException {
+ return mapper.readValue(response.getEntity().getContent(), JsonNode.class);
}
/** A POJO describing a connection configuration to Elasticsearch. */
@@ -428,23 +423,24 @@ public class ElasticsearchIO {
// But, as each shard (replica or primary) is responsible for only one part of the data,
// there will be no duplicate.
- JsonObject statsJson = getStats(true);
- JsonObject shardsJson =
+ JsonNode statsJson = getStats(true);
+ JsonNode shardsJson =
statsJson
- .getAsJsonObject("indices")
- .getAsJsonObject(spec.getConnectionConfiguration().getIndex())
- .getAsJsonObject("shards");
- Set<Map.Entry<String, JsonElement>> shards = shardsJson.entrySet();
- for (Map.Entry<String, JsonElement> shardJson : shards) {
+ .path("indices")
+ .path(spec.getConnectionConfiguration().getIndex())
+ .path("shards");
+
+ Iterator<Map.Entry<String, JsonNode>> shards = shardsJson.fields();
+ while (shards.hasNext()) {
+ Map.Entry<String, JsonNode> shardJson = shards.next();
String shardId = shardJson.getKey();
- JsonArray value = (JsonArray) shardJson.getValue();
+ JsonNode value = (JsonNode) shardJson.getValue();
boolean isPrimaryShard =
value
- .get(0)
- .getAsJsonObject()
- .getAsJsonObject("routing")
- .getAsJsonPrimitive("primary")
- .getAsBoolean();
+ .path(0)
+ .path("routing")
+ .path("primary")
+ .asBoolean();
if (isPrimaryShard) {
sources.add(new BoundedElasticsearchSource(spec, shardId));
}
@@ -463,14 +459,14 @@ public class ElasticsearchIO {
// NB: Elasticsearch 5.x now provides the slice API.
// (https://www.elastic.co/guide/en/elasticsearch/reference/5.0/search-request-scroll.html
// #sliced-scroll)
- JsonObject statsJson = getStats(false);
- JsonObject indexStats =
+ JsonNode statsJson = getStats(false);
+ JsonNode indexStats =
statsJson
- .getAsJsonObject("indices")
- .getAsJsonObject(spec.getConnectionConfiguration().getIndex())
- .getAsJsonObject("primaries");
- JsonObject store = indexStats.getAsJsonObject("store");
- return store.getAsJsonPrimitive("size_in_bytes").getAsLong();
+ .path("indices")
+ .path(spec.getConnectionConfiguration().getIndex())
+ .path("primaries");
+ JsonNode store = indexStats.path("store");
+ return store.path("size_in_bytes").asLong();
}
@Override
@@ -494,7 +490,7 @@ public class ElasticsearchIO {
return StringUtf8Coder.of();
}
- private JsonObject getStats(boolean shardLevel) throws IOException {
+ private JsonNode getStats(boolean shardLevel) throws IOException {
HashMap<String, String> params = new HashMap<>();
if (shardLevel) {
params.put("level", "shards");
@@ -544,13 +540,13 @@ public class ElasticsearchIO {
HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON);
response =
restClient.performRequest("GET", endPoint, params, queryEntity, new BasicHeader("", ""));
- JsonObject searchResult = parseResponse(response);
+ JsonNode searchResult = parseResponse(response);
updateScrollId(searchResult);
return readNextBatchAndReturnFirstDocument(searchResult);
}
- private void updateScrollId(JsonObject searchResult) {
- scrollId = searchResult.getAsJsonPrimitive("_scroll_id").getAsString();
+ private void updateScrollId(JsonNode searchResult) {
+ scrollId = searchResult.path("_scroll_id").asText();
}
@Override
@@ -571,15 +567,15 @@ public class ElasticsearchIO {
Collections.<String, String>emptyMap(),
scrollEntity,
new BasicHeader("", ""));
- JsonObject searchResult = parseResponse(response);
+ JsonNode searchResult = parseResponse(response);
updateScrollId(searchResult);
return readNextBatchAndReturnFirstDocument(searchResult);
}
}
- private boolean readNextBatchAndReturnFirstDocument(JsonObject searchResult) {
+ private boolean readNextBatchAndReturnFirstDocument(JsonNode searchResult) {
//stop if no more data
- JsonArray hits = searchResult.getAsJsonObject("hits").getAsJsonArray("hits");
+ JsonNode hits = searchResult.path("hits").path("hits");
if (hits.size() == 0) {
current = null;
batchIterator = null;
@@ -587,8 +583,8 @@ public class ElasticsearchIO {
}
// list behind iterator is empty
List<String> batch = new ArrayList<>();
- for (JsonElement hit : hits) {
- String document = hit.getAsJsonObject().getAsJsonObject("_source").toString();
+ for (JsonNode hit : hits) {
+ String document = hit.path("_source").toString();
batch.add(document);
}
batchIterator = batch.listIterator();
@@ -780,26 +776,26 @@ public class ElasticsearchIO {
Collections.<String, String>emptyMap(),
requestBody,
new BasicHeader("", ""));
- JsonObject searchResult = parseResponse(response);
- boolean errors = searchResult.getAsJsonPrimitive("errors").getAsBoolean();
+ JsonNode searchResult = parseResponse(response);
+ boolean errors = searchResult.path("errors").asBoolean();
if (errors) {
StringBuilder errorMessages =
new StringBuilder(
"Error writing to Elasticsearch, some elements could not be inserted:");
- JsonArray items = searchResult.getAsJsonArray("items");
+ JsonNode items = searchResult.path("items");
//some items present in bulk might have errors, concatenate error messages
- for (JsonElement item : items) {
- JsonObject creationObject = item.getAsJsonObject().getAsJsonObject("create");
- JsonObject error = creationObject.getAsJsonObject("error");
+ for (JsonNode item : items) {
+ JsonNode creationObject = item.path("create");
+ JsonNode error = creationObject.get("error");
if (error != null) {
- String type = error.getAsJsonPrimitive("type").getAsString();
- String reason = error.getAsJsonPrimitive("reason").getAsString();
- String docId = creationObject.getAsJsonPrimitive("_id").getAsString();
+ String type = error.path("type").asText();
+ String reason = error.path("reason").asText();
+ String docId = creationObject.path("_id").asText();
errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type));
- JsonObject causedBy = error.getAsJsonObject("caused_by");
+ JsonNode causedBy = error.get("caused_by");
if (causedBy != null) {
- String cbReason = causedBy.getAsJsonPrimitive("reason").getAsString();
- String cbType = causedBy.getAsJsonPrimitive("type").getAsString();
+ String cbReason = causedBy.path("reason").asText();
+ String cbType = causedBy.path("type").asText();
errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType));
}
}