You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "Tianji Li (JIRA)" <ji...@apache.org> on 2015/10/02 05:12:26 UTC

[jira] [Comment Edited] (FLUME-2787) org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer does not serialize @timestamp correctly

    [ https://issues.apache.org/jira/browse/FLUME-2787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14940488#comment-14940488 ] 

Tianji Li edited comment on FLUME-2787 at 10/2/15 3:12 AM:
-----------------------------------------------------------

We experienced similar issues in our work and figured out a way to fix it. 

In essence, ElasticSearch is not really schema less, because the underlying Lucene requires strict schemas. The schema-less of ElasticSearch is achieved by it guessing what data format should be used when data is coming in. But the guessing work sometimes fails, and fails silently, leaving people pulling lots of hair off to understand.

To resolve this guessing work therefore is to explicitly tell ElasticSearch what your schema is, before sending any data in.

There are two ways to achieve this.

First, using the index templates provided by ElasticSearch (https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html). The idea of these templates is that one can set mappings (mapping means schema in the ElasticSearch terms) for many indices before using them. This is not a good option when using Flume and ElasticSearch at the same time. The reason is Flume automatically rotates the indices on a daily basis (which is a good thing when indices are huge). To use the templates, one has to keep the rotation in mind al the time, and change the templates at good timing.

Second, we prefer therefore to let Flume set the mappings/schemas when needed. Again due to Flume's index rotation, setting mappings should be done in EventSerializers. I implemented this as in the attached patch. 

Now, let me give the results.

If not using agent.sinks.elastic-sink.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer, rather than as mentioned by [~goi.cto] above, the field names I got are not 'body' and 'timestamp', but are shown below (Please check the details in the noserializer.conf file in the patch.):
{
   "noserializer-2015-10-01": {
      "mappings": {
         "logs": {
            "properties": {
               "@fields": {
                  "properties": {
                     "timestamp": {
                        "type": "string"
                     }
                  }
               },
               "@message": {
                  "type": "string"
               },
               "@timestamp": {
                  "type": "date",
                  "format": "dateOptionalTime"
               }
            }
         }
      }
   }
}

If using agent.sinks.elastic-sink.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer, same as [~goi.cto] said, the 'body' field is a string, rather than a 'date'. Please check the details in the dynamic.conf file in the patch.
{
   "dynamic-2015-10-01": {
      "mappings": {
         "logs": {
            "properties": {
               "body": {
                  "type": "string"
               },
               "timestamp": {
                  "type": "string"
               }
            }
         }
      }
   }
}

If using the manualSerializer I implemented, the mapping will be below. See the manual.conf in the patch for how to use it.
{
   "manual-2015-10-01": {
      "mappings": {
         "logs": {
            "_timestamp": {
               "enabled": true,
               "store": true,
               "path": "timestamp",
               "format": "date_time",
               "default": "1970-01-01T00:00:00.000Z"
            },
            "_index": {
               "enabled": true
            },
            "properties": {
               "body": {
                  "type": "string",
                  "index": "not_analyzed",
                  "doc_values": true
               },
               "timestamp": {
                  "type": "date",
                  "format": "dateOptionalTime"
               }
            }
         }
      }
   }
} 

In our work, and in many other use cases (https://www.elastic.co/guide/en/elasticsearch/guide/current/aggregations-and-analysis.html), manual mapping is needed on string fields such as people full names. Say we index FirstName and LastName in a same field, and do a terms aggregation which supports to get distinct full names. If we use the org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer, the field will be 'analyzed' by default which means ElasticSearch will index firstName and lastName separately. For another example, if we index:
  New York
  New York
  New York
  New Mexico
  New Jersey
in a field that is analyzed, then this term aggregation
{
    "size" : 0,
    "aggs" : {
        "bodyTerms" : {
            "terms" : {
                "field" : "body"
            }
        }
    }
}

will give us:
{
   "took": 1,
   "timed_out": false,
   "_shards": {
      "total": 5,
      "successful": 5,
      "failed": 0
   },
   "hits": {
      "total": 5,
      "max_score": 0,
      "hits": []
   },
   "aggregations": {
      "bodyTerms": {
         "doc_count_error_upper_bound": 0,
         "sum_other_doc_count": 0,
         "buckets": [
            {
               "key": "new",
               "doc_count": 5
            },
            {
               "key": "york",
               "doc_count": 3
            },
            {
               "key": "jersey",
               "doc_count": 1
            },
            {
               "key": "mexico",
               "doc_count": 1
            }
         ]
      }
   }
}


To resolve this, we have to change the field to be 'not_analyzed' which will give us (using the org.apache.flume.sink.elasticsearch.ElasticSearchManualSerializer in the patch):
{
   "took": 2,
   "timed_out": false,
   "_shards": {
      "total": 5,
      "successful": 5,
      "failed": 0
   },
   "hits": {
      "total": 5,
      "max_score": 0,
      "hits": []
   },
   "aggregations": {
      "bodyTerms": {
         "doc_count_error_upper_bound": 0,
         "sum_other_doc_count": 0,
         "buckets": [
            {
               "key": "New York",
               "doc_count": 3
            },
            {
               "key": "New Jersey",
               "doc_count": 1
            },
            {
               "key": "New Mexico",
               "doc_count": 1
            }
         ]
      }
   }
}




was (Author: skyahead):
We experienced similar issues in our work and figured out a way to fix it. 

In essence, ElasticSearch is not really schema less, because the underlying Lucene requires strict schemas. The schema-less of ElasticSearch is achieved by it guessing what data format should be used when data is coming in. But the guessing work sometimes fails, and fails silently, leaving people pulling lots of hair off to understand.

To resolve this guessing work therefore is to explicitly tell ElasticSearch what your schema is, before sending any data in.

There are two ways to achieve this.

First, using the index templates provided by ElasticSearch (https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html). The idea of these templates is that one can set mappings (mapping means schema in the ElasticSearch terms) for many indices before using them. This is not a good option when using Flume and ElasticSearch at the same time. The reason is Flume automately rotates the indices on a daily basis (which is a good thing when indices are huge). To use the templates, one has to keep the rotation in mind al the time, and change the templates at good timing.

Second, we prefer therefore to let Flume set the mappings/schemas when needed. Again due to Flume's index rotation, setting mappings should be done in EventSerializers. I implemented this as in the attached patch. 

Now, let me give the results.

If not using agent.sinks.elastic-sink.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer, as mentioned by [~goi.cto] above, depending on version of ElasticSearch used, the actual mapping can vary. For example, using ElasticSearch 1.7.1, the latest version, the field names I got are not 'body' and 'timestamp', but are shown below (Please check the details in the noserializer file in the patch.):
{
   "noserializer-2015-10-01": {
      "mappings": {
         "logs": {
            "properties": {
               "@fields": {
                  "properties": {
                     "timestamp": {
                        "type": "string"
                     }
                  }
               },
               "@message": {
                  "type": "string"
               },
               "@timestamp": {
                  "type": "date",
                  "format": "dateOptionalTime"
               }
            }
         }
      }
   }
}

If using agent.sinks.elastic-sink.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer, again as [~goi.cto] said, the 'body' field is a string, rather than a 'date_time'. Please check the details in the dynamic.conf file in the patch.
{
   "dynamic-2015-10-01": {
      "mappings": {
         "logs": {
            "properties": {
               "body": {
                  "type": "string"
               },
               "timestamp": {
                  "type": "string"
               }
            }
         }
      }
   }
}

If using the manualSerializer I implemented, the mapping will be below. See the manual.conf in the patch for how to use it.
{
   "manual-2015-10-01": {
      "mappings": {
         "logs": {
            "_timestamp": {
               "enabled": true,
               "store": true,
               "path": "timestamp",
               "format": "date_time",
               "default": "1970-01-01T00:00:00.000Z"
            },
            "_index": {
               "enabled": true
            },
            "properties": {
               "body": {
                  "type": "string",
                  "index": "not_analyzed",
                  "doc_values": true
               },
               "timestamp": {
                  "type": "date",
                  "format": "dateOptionalTime"
               }
            }
         }
      }
   }
} 

In our work, and in many other use cases, manual mapping is needed on string field such as people full names. Say we index FirstName and LastName in a same field, and do a terms aggregation which supports to get distinct full names. If we use the org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer, the field will be 'analyzed' by default which means ElasticSearch will index firstName and lastName separately. For example, if we index:
  New York
  New York
  New York
  New Mexico
  New Jersey
in a field that is analyzed, then this term aggregation
{
    "size" : 0,
    "aggs" : {
        "bodyTerms" : {
            "terms" : {
                "field" : "body"
            }
        }
    }
}

will give us:
{
   "took": 1,
   "timed_out": false,
   "_shards": {
      "total": 5,
      "successful": 5,
      "failed": 0
   },
   "hits": {
      "total": 5,
      "max_score": 0,
      "hits": []
   },
   "aggregations": {
      "bodyTerms": {
         "doc_count_error_upper_bound": 0,
         "sum_other_doc_count": 0,
         "buckets": [
            {
               "key": "new",
               "doc_count": 5
            },
            {
               "key": "york",
               "doc_count": 3
            },
            {
               "key": "jersey",
               "doc_count": 1
            },
            {
               "key": "mexico",
               "doc_count": 1
            }
         ]
      }
   }
}


To resolve this, we have to change the field to be 'not_analyzed' which will give us (using the org.apache.flume.sink.elasticsearch.ElasticSearchManualSerializer in the patch):
{
   "took": 2,
   "timed_out": false,
   "_shards": {
      "total": 5,
      "successful": 5,
      "failed": 0
   },
   "hits": {
      "total": 5,
      "max_score": 0,
      "hits": []
   },
   "aggregations": {
      "bodyTerms": {
         "doc_count_error_upper_bound": 0,
         "sum_other_doc_count": 0,
         "buckets": [
            {
               "key": "New York",
               "doc_count": 3
            },
            {
               "key": "New Jersey",
               "doc_count": 1
            },
            {
               "key": "New Mexico",
               "doc_count": 1
            }
         ]
      }
   }
}



> org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer does not serialize @timestamp correctly
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-2787
>                 URL: https://issues.apache.org/jira/browse/FLUME-2787
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>            Reporter: Eran W
>            Priority: Minor
>         Attachments: FLUME-2787.2.patch, FLUME-2787.patch
>
>
> When using 
> agent.sinks.elastic-sink.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
> the event timestamp is stored as string and not dateOptionalTime if the agent.sinks.elastic-sink.serializer is not set the code works as expected.
> REPRO:
> 1) use the following config
> agent.channels.channel-elastic.type = memory
> agent.channels.channel-elastic.capacity = 1000
> agent.channels.channel-elastic.transactionCapacity = 100
> # Define a source on agent and connect to channel.
> agent.sources.tail-source.type = exec
> agent.sources.tail-source.command = tail -4000 /home/cto/hs_err_pid11679.log
> agent.sources.tail-source.channels = channel-elastic
> #####INTERCEPTORS
> agent.sources.tail-source.interceptors = timestampInterceptor
> agent.sources.tail-source.interceptors.timestampInterceptor.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
> agent.sources.tail-source.interceptors.timestampInterceptor.preserveExisting = true
> agent.sinks.elastic-sink.channel = channel-elastic
> agent.sinks.elastic-sink.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
> agent.sinks.elastic-sink.hostNames = 127.0.0.1:9300
> agent.sinks.elastic-sink.indexName = flume_index
> agent.sinks.elastic-sink.indexType = logs_type
> agent.sinks.elastic-sink.clusterName = elasticsearch
> agent.sinks.elastic-sink.batchSize = 10
> agent.sinks.elastic-sink.ttl = 5d
> agent.sinks.elastic-sink.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
> # Finally, activate.
> agent.channels = channel-elastic
> agent.sources = tail-source
> agent.sinks =  elastic-sink
> 2) run it
> 3) look at the elastic index which was created:
> {
> "state": "open",
> "settings": {
> "index": {
> "creation_date": "1441728286466",
> "number_of_shards": "5",
> "number_of_replicas": "1",
> "version": {
> "created": "1070199"
> },
> "uuid": "9u-OCPxoQHWwURHyxh15lA"
> }
> },
> "mappings": {
> "logs_type": {
> "properties": {
> "body": {
> "type": "string"
> },
> "timestamp": {
> "type": "string"
> }
> }
> }
> },
> "aliases": [ ]
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)