You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by GitBox <gi...@apache.org> on 2020/03/27 03:05:23 UTC

[GitHub] [flume] AndreLouisCaron opened a new pull request #322: Honor UUID interceptor in ElasticSearch sink

AndreLouisCaron opened a new pull request #322: Honor UUID interceptor in ElasticSearch sink
URL: https://github.com/apache/flume/pull/322
 
 
   ElasticSearch provides support for deduplication of events using the
   `_id` field when inserting documents using the bulk insert facility.
   
   When using the UUID interceptor with `headerName = _id` for this
   purpose, the logstash serializer will rename this as `@fields._id`,
   which is stored as though it were any other field.
   
   In order to enable deduplication of events in the ElasticSearch sink, we
   need to ensure the header containing the document ID is sent as `_id`,
   no matter what its original name is.
   
   The dynamic serializer probably does not have the same limitation, but
   it currently counts on the UUID interceptor's `headerName` property to
   be set to `_id`.  This is not the default and it is unlikely to work in
   complex Flume networks that replicate events to multiple sinks.
   Therefore, I fixed it to support the same logic as the logstash
   serializer.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flume] AndreLouisCaron commented on a change in pull request #322: Honor UUID interceptor in ElasticSearch sink

Posted by GitBox <gi...@apache.org>.
AndreLouisCaron commented on a change in pull request #322: Honor UUID interceptor in ElasticSearch sink
URL: https://github.com/apache/flume/pull/322#discussion_r399456518
 
 

 ##########
 File path: flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
 ##########
 @@ -97,11 +106,18 @@ public void addEvent(Event event, IndexNameBuilder indexNameBuilder, String inde
     BytesReference content = serializer.getContentBuilder(event).bytes();
     Map<String, Map<String, String>> parameters = new HashMap<String, Map<String, String>>();
     Map<String, String> indexParameters = new HashMap<String, String>();
+    Map<String, String> headers = event.getHeaders();
     indexParameters.put(INDEX_PARAM, indexNameBuilder.getIndexName(event));
     indexParameters.put(TYPE_PARAM, indexType);
     if (ttlMs > 0) {
       indexParameters.put(TTL_PARAM, Long.toString(ttlMs));
     }
+    if (idHeaderName != null) {
+      String id = headers.get(idHeaderName);
+      if (id != null) {
+        indexParameters.put(ID_PARAM, id);
 
 Review comment:
   Note to reviewers: everything else in this PR exists to support this line.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services