You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2020/02/28 05:15:00 UTC

[jira] [Assigned] (FLINK-15400) elasticsearch sink support dynamic index.

     [ https://issues.apache.org/jira/browse/FLINK-15400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jark Wu reassigned FLINK-15400:
-------------------------------

    Assignee: Leonard Xu

> elasticsearch sink support dynamic index.
> -----------------------------------------
>
>                 Key: FLINK-15400
>                 URL: https://issues.apache.org/jira/browse/FLINK-15400
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / ElasticSearch, Table SQL / Ecosystem
>    Affects Versions: 1.11.0
>            Reporter: ouyangwulin
>            Assignee: Leonard Xu
>            Priority: Major
>              Labels: usability
>             Fix For: 1.11.0
>
>
> From user-zh@flink.apache.org([https://lists.apache.org/thread.html/ac4e0c068baeb3b070f0213a2e1314e6552b226b8132a4c49d667ecd%40%3Cuser-zh.flink.apache.org%3E]),  Becuase the es 6/7 not support ttl. so User need clean the index by timestamp. Add dynamic index is a useful function.  Add with properties 'dynamicIndex' as a switch for open dynamicIndex. Add with  properties 'indexField'  for the extract time field, Add properties 'indexInterval' for change cycle mode.
>  
> ||With property||discribe||default||Required||
> |dynamicIndex|Dynamic or not|false(true/false)|false|
> |indexField|extract index field| none|dynamicIndex is true , then indexField is required,only supported type "timestamp","date","long" |
> |indexInterval|mode for  cycle|d|ddynamicIndex is true , this field is required ,the value optional is:
>  d:day
>  m:mouth
>  w:week|
>  
> After discussion, the final design looks as following :
> {code:java}
> CREATE TABLE es_sink_table (
>   log_source varchar ,
>   log_content varchar ,
>   log_level bigint ,
>   log_ts timestamp,
> ) WITH (
> 'connector.type' = 'elasticsearch',
> 'connector.version' = '6',
> 'connector.index'='my-log-{log_ts|yyyy-MM-dd}', 
> # elasticsearch index name, Flink support create index based on field at 
> # runtime dynamically, the index value comes from the dynamicIndex 
> # pattern when the field type is varchar, eg:'my-log-{log_source}',the # dynamicIndex pattern support format and parse date by Java 
> # SimpleDataFormat when the field type is timestamp/date, 
> # eg:'my-log-{log_ts|yyyy-MM-dd}'.
> 'connector.index-alias'='my-log',       
> # index alias name, the alias name mapping to all indies that 
> # creatd from 'connector.index'. 
> …
> )
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)