You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/04/10 20:49:28 UTC
flume git commit: FLUME-2649. Elasticsearch sink doesn't handle JSON
fields correctly
Repository: flume
Updated Branches:
refs/heads/trunk 91ec57945 -> c77f1ac46
FLUME-2649. Elasticsearch sink doesn't handle JSON fields correctly
(Benjamin Fiorini via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c77f1ac4
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c77f1ac4
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c77f1ac4
Branch: refs/heads/trunk
Commit: c77f1ac469aac712e533cce7748683f1d2cc131a
Parents: 91ec579
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Apr 10 11:48:20 2015 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Apr 10 11:48:20 2015 -0700
----------------------------------------------------------------------
.../sink/elasticsearch/ContentBuilderUtil.java | 16 +++++++++-------
.../AbstractElasticSearchSinkTest.java | 13 +++++++++----
.../sink/elasticsearch/TestElasticSearchSink.java | 15 ++++++++++++---
3 files changed, 30 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/c77f1ac4/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
index 70d0b86..de0acf4 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
@@ -55,18 +55,20 @@ public class ContentBuilderUtil {
public static void addComplexField(XContentBuilder builder, String fieldName,
XContentType contentType, byte[] data) throws IOException {
- XContentParser parser = null;
+ XContentParser parser =
+ XContentFactory.xContent(contentType).createParser(data);
+ parser.nextToken();
+ // Add the field name, but not the value.
+ builder.field(fieldName);
try {
- XContentBuilder tmp = jsonBuilder();
- parser = XContentFactory.xContent(contentType).createParser(data);
- parser.nextToken();
- tmp.copyCurrentStructure(parser);
- builder.field(fieldName, tmp.string());
+ // This will add the whole parsed content as the value of the field.
+ builder.copyCurrentStructure(parser);
} catch (JsonParseException ex) {
// If we get an exception here the most likely cause is nested JSON that
// can't be figured out in the body. At this point just push it through
// as is, we have already added the field so don't do it again
- addSimpleField(builder, fieldName, data);
+ builder.endObject();
+ builder.field(fieldName, new String(data, charset));
} finally {
if (parser != null) {
parser.close();
http://git-wip-us.apache.org/repos/asf/flume/blob/c77f1ac4/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
index 48eafdf..2f8fd6d 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
@@ -121,13 +121,13 @@ public abstract class AbstractElasticSearchSinkTest {
void assertMatchAllQuery(int expectedHits, Event... events) {
assertSearch(expectedHits, performSearch(QueryBuilders.matchAllQuery()),
- events);
+ null, events);
}
void assertBodyQuery(int expectedHits, Event... events) {
// Perform Multi Field Match
assertSearch(expectedHits,
- performSearch(QueryBuilders.fieldQuery("@message", "event")));
+ performSearch(QueryBuilders.fieldQuery("@message", "event")), null);
}
SearchResponse performSearch(QueryBuilder query) {
@@ -135,7 +135,7 @@ public abstract class AbstractElasticSearchSinkTest {
.setTypes(DEFAULT_INDEX_TYPE).setQuery(query).execute().actionGet();
}
- void assertSearch(int expectedHits, SearchResponse response, Event... events) {
+ void assertSearch(int expectedHits, SearchResponse response, Map<String, Object> expectedBody, Event... events) {
SearchHits hitResponse = response.getHits();
assertEquals(expectedHits, hitResponse.getTotalHits());
@@ -151,7 +151,12 @@ public abstract class AbstractElasticSearchSinkTest {
Event event = events[i];
SearchHit hit = hits[i];
Map<String, Object> source = hit.getSource();
- assertEquals(new String(event.getBody()), source.get("@message"));
+ if (expectedBody == null) {
+ assertEquals(new String(event.getBody()), source.get("@message"));
+ } else {
+ assertEquals(expectedBody, source.get("@message"));
+ }
}
}
+
}
http://git-wip-us.apache.org/repos/asf/flume/blob/c77f1ac4/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
index 3e11726..78e1665 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
@@ -51,6 +51,7 @@ import org.elasticsearch.client.Requests;
import org.elasticsearch.common.UUID;
import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
+import org.elasticsearch.index.query.QueryBuilders;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -100,7 +101,8 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
Transaction tx = channel.getTransaction();
tx.begin();
- Event event = EventBuilder.withBody("{\"event\":\"json content\"}".getBytes());
+ Event event = EventBuilder.withBody(
+ "{\"event\":\"json content\",\"num\":1}".getBytes());
channel.put(event);
tx.commit();
tx.close();
@@ -110,8 +112,15 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
client.admin().indices()
.refresh(Requests.refreshRequest(timestampedIndexName)).actionGet();
- assertMatchAllQuery(1, event);
- assertBodyQuery(1, event);
+ Map<String, Object> expectedBody = new HashMap<String, Object>();
+ expectedBody.put("event", "json content");
+ expectedBody.put("num", 1);
+
+ assertSearch(1,
+ performSearch(QueryBuilders.matchAllQuery()), expectedBody, event);
+ assertSearch(1,
+ performSearch(QueryBuilders.fieldQuery("@message.event", "json")),
+ expectedBody, event);
}
@Test