You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/04/10 20:49:28 UTC

flume git commit: FLUME-2649. Elasticsearch sink doesn't handle JSON fields correctly

Repository: flume
Updated Branches:
  refs/heads/trunk 91ec57945 -> c77f1ac46


FLUME-2649. Elasticsearch sink doesn't handle JSON fields correctly

(Benjamin Fiorini via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c77f1ac4
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c77f1ac4
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c77f1ac4

Branch: refs/heads/trunk
Commit: c77f1ac469aac712e533cce7748683f1d2cc131a
Parents: 91ec579
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Apr 10 11:48:20 2015 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Apr 10 11:48:20 2015 -0700

----------------------------------------------------------------------
 .../sink/elasticsearch/ContentBuilderUtil.java      | 16 +++++++++-------
 .../AbstractElasticSearchSinkTest.java              | 13 +++++++++----
 .../sink/elasticsearch/TestElasticSearchSink.java   | 15 ++++++++++++---
 3 files changed, 30 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/c77f1ac4/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
index 70d0b86..de0acf4 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
@@ -55,18 +55,20 @@ public class ContentBuilderUtil {
 
   public static void addComplexField(XContentBuilder builder, String fieldName,
       XContentType contentType, byte[] data) throws IOException {
-    XContentParser parser = null;
+    XContentParser parser =
+      XContentFactory.xContent(contentType).createParser(data);
+    parser.nextToken();
+    // Add the field name, but not the value.
+    builder.field(fieldName);
     try {
-      XContentBuilder tmp = jsonBuilder();
-      parser = XContentFactory.xContent(contentType).createParser(data);
-      parser.nextToken();
-      tmp.copyCurrentStructure(parser);
-      builder.field(fieldName, tmp.string());
+      // This will add the whole parsed content as the value of the field.
+      builder.copyCurrentStructure(parser);
     } catch (JsonParseException ex) {
       // If we get an exception here the most likely cause is nested JSON that
       // can't be figured out in the body. At this point just push it through
       // as is, we have already added the field so don't do it again
-      addSimpleField(builder, fieldName, data);
+      builder.endObject();
+      builder.field(fieldName, new String(data, charset));
     } finally {
       if (parser != null) {
         parser.close();

http://git-wip-us.apache.org/repos/asf/flume/blob/c77f1ac4/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
index 48eafdf..2f8fd6d 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
@@ -121,13 +121,13 @@ public abstract class AbstractElasticSearchSinkTest {
 
   void assertMatchAllQuery(int expectedHits, Event... events) {
     assertSearch(expectedHits, performSearch(QueryBuilders.matchAllQuery()),
-        events);
+        null, events);
   }
 
   void assertBodyQuery(int expectedHits, Event... events) {
     // Perform Multi Field Match
     assertSearch(expectedHits,
-        performSearch(QueryBuilders.fieldQuery("@message", "event")));
+        performSearch(QueryBuilders.fieldQuery("@message", "event")), null);
   }
 
   SearchResponse performSearch(QueryBuilder query) {
@@ -135,7 +135,7 @@ public abstract class AbstractElasticSearchSinkTest {
         .setTypes(DEFAULT_INDEX_TYPE).setQuery(query).execute().actionGet();
   }
 
-  void assertSearch(int expectedHits, SearchResponse response, Event... events) {
+  void assertSearch(int expectedHits, SearchResponse response, Map<String, Object> expectedBody, Event... events) {
     SearchHits hitResponse = response.getHits();
     assertEquals(expectedHits, hitResponse.getTotalHits());
 
@@ -151,7 +151,12 @@ public abstract class AbstractElasticSearchSinkTest {
       Event event = events[i];
       SearchHit hit = hits[i];
       Map<String, Object> source = hit.getSource();
-      assertEquals(new String(event.getBody()), source.get("@message"));
+      if (expectedBody == null) {
+        assertEquals(new String(event.getBody()), source.get("@message"));
+      } else {
+        assertEquals(expectedBody, source.get("@message"));
+      }
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/c77f1ac4/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
index 3e11726..78e1665 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
@@ -51,6 +51,7 @@ import org.elasticsearch.client.Requests;
 import org.elasticsearch.common.UUID;
 import org.elasticsearch.common.io.BytesStream;
 import org.elasticsearch.common.io.FastByteArrayOutputStream;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -100,7 +101,8 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
 
     Transaction tx = channel.getTransaction();
     tx.begin();
-    Event event = EventBuilder.withBody("{\"event\":\"json content\"}".getBytes());
+    Event event = EventBuilder.withBody(
+        "{\"event\":\"json content\",\"num\":1}".getBytes());
     channel.put(event);
     tx.commit();
     tx.close();
@@ -110,8 +112,15 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
     client.admin().indices()
             .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet();
 
-    assertMatchAllQuery(1, event);
-    assertBodyQuery(1, event);
+    Map<String, Object> expectedBody = new HashMap<String, Object>();
+    expectedBody.put("event", "json content");
+    expectedBody.put("num", 1);
+
+    assertSearch(1,
+        performSearch(QueryBuilders.matchAllQuery()), expectedBody, event);
+    assertSearch(1,
+        performSearch(QueryBuilders.fieldQuery("@message.event", "json")),
+        expectedBody, event);
   }
 
   @Test