You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by GitBox <gi...@apache.org> on 2020/03/27 03:05:23 UTC
[GitHub] [flume] AndreLouisCaron opened a new pull request #322: Honor UUID
interceptor in ElasticSearch sink
AndreLouisCaron opened a new pull request #322: Honor UUID interceptor in ElasticSearch sink
URL: https://github.com/apache/flume/pull/322
ElasticSearch provides support for deduplication of events using the
`_id` field when inserting documents using the bulk insert facility.
When using the UUID interceptor with `headerName = _id` for this
purpose, the logstash serializer will rename this as `@fields._id`,
which is stored as though it were any other field.
In order to enable deduplication of events in the ElasticSearch sink, we
need to ensure the header containing the document ID is sent as `_id`,
no matter what its original name is.
The dynamic serializer probably does not have the same limitation, but
it currently counts on the UUID interceptor's `headerName` property to
be set to `_id`. This is not the default and it is unlikely to work in
complex Flume networks that replicate events to multiple sinks.
Therefore, I fixed it to support the same logic as the logstash
serializer.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flume] AndreLouisCaron commented on a change in pull request #322:
Honor UUID interceptor in ElasticSearch sink
Posted by GitBox <gi...@apache.org>.
AndreLouisCaron commented on a change in pull request #322: Honor UUID interceptor in ElasticSearch sink
URL: https://github.com/apache/flume/pull/322#discussion_r399456518
##########
File path: flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
##########
@@ -97,11 +106,18 @@ public void addEvent(Event event, IndexNameBuilder indexNameBuilder, String inde
BytesReference content = serializer.getContentBuilder(event).bytes();
Map<String, Map<String, String>> parameters = new HashMap<String, Map<String, String>>();
Map<String, String> indexParameters = new HashMap<String, String>();
+ Map<String, String> headers = event.getHeaders();
indexParameters.put(INDEX_PARAM, indexNameBuilder.getIndexName(event));
indexParameters.put(TYPE_PARAM, indexType);
if (ttlMs > 0) {
indexParameters.put(TTL_PARAM, Long.toString(ttlMs));
}
+ if (idHeaderName != null) {
+ String id = headers.get(idHeaderName);
+ if (id != null) {
+ indexParameters.put(ID_PARAM, id);
Review comment:
Note to reviewers: everything else in this PR exists to support this line.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services