You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2023/03/30 12:57:00 UTC

[jira] [Commented] (FLINK-30809) flink-connector-elasticsearch7 updates data pipeline does not work

    [ https://issues.apache.org/jira/browse/FLINK-30809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17706868#comment-17706868 ] 

Martijn Visser commented on FLINK-30809:
----------------------------------------

How have you determined that the problem is the Elasticsearch sink, and not the MySQL CDC source? Is updated record in MySQL actually visible in your test? 

> flink-connector-elasticsearch7 updates data pipeline does not work
> ------------------------------------------------------------------
>
>                 Key: FLINK-30809
>                 URL: https://issues.apache.org/jira/browse/FLINK-30809
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / ElasticSearch
>    Affects Versions: elasticsearch-3.0.0
>         Environment: Flink Version: 1.15.3
> Flink-CDC Version: 2.3.0
> Mysql Version: 5.7
> Elasticsearch Version: 7.17.7
> During the test, these jar packages were added under flink/lib:
> flink-sql-connector-elasticsearch7-1.15.3.jar
> flink-sql-connector-mysql-cdc-2.3.0.jar
> mysql-connector-java-8.0.27.jar
>            Reporter: iduanyingjie
>            Priority: Major
>
> create elasticsearch in docker
> {code:yaml}
> version: '2.1'
> services:
>  elasticsearch:
>    image: docker.elastic.co/elasticsearch/elasticsearch:7.17.7
>    environment:
>      - cluster.name=docker-cluster
>      - bootstrap.memory_lock=true
>      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
>      - discovery.type=single-node
>    ports:
>      - "9200:9200"
>      - "9300:9300"
>    ulimits:
>      memlock:
>        soft: -1
>        hard: -1
>      nofile:
>        soft: 65536
>        hard: 65536
>  kibana:
>    image: docker.elastic.co/kibana/kibana:7.17.7
>    ports:
>      - "5601:5601"
> {code}
> create table: records in mysql
> {code:sql}
> CREATE TABLE records (
>  id bigint unsigned NOT NULL AUTO_INCREMENT,
>  user_id bigint unsigned NOT NULL,
>  create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
>  PRIMARY KEY (id)
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
> {code}
> insert some datas
> {code:sql}
> INSERT INTO test.records (id, user_id, create_time) VALUES(default, 123, '2023-01-20 12:25:11');
> INSERT INTO test.records (id, user_id, create_time) VALUES(default, 456, '2023-01-20 12:25:30');
> INSERT INTO test.records (id, user_id, create_time) VALUES(default, 789, '2023-01-20 12:25:37');
> {code}
> create pipeline in es:
> {code:java}
> PUT /_ingest/pipeline/set_ingest_timestamp_fields
> {
>  "processors": [
>    {
>      "set": {
>        "field": "ingest_timestamp",
>        "value": "{{_ingest.timestamp}}"
>      }
>    }
>  ]
> }{code}
> create index in es:
> {code:java}
> PUT enriched_records
> {
>  "settings": {
>    "default_pipeline": "set_ingest_timestamp_fields",
>    "number_of_shards": "1",
>    "number_of_replicas": "0"
>  }
> }{code}
> excute flink sql:
> {code:sql}
> CREATE TABLE records (
>    id INT,
>    user_id INT,
>    create_time TIMESTAMP(3),
>    proc_time AS PROCTIME(),
>    operation_time TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
>    PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>    'connector' = 'mysql-cdc',
>    'hostname' = 'localhost',
>    'port' = '3306',
>    'username' = 'root',
>    'password' = '123456',
>    'database-name' = 'test',
>    'table-name' = 'records',
>    'server-time-zone' = 'UTC'
> );
> CREATE TABLE enriched_records (
>    id INT,
>    user_id INT,
>    create_time TIMESTAMP(3),
>    proc_time TIMESTAMP_LTZ(3),
>    operation_time TIMESTAMP_LTZ(3),
>    PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>    'connector' = 'elasticsearch-7',
>    'hosts' = 'http://localhost:9200',
>    'index' = 'enriched_records'
> );
> INSERT INTO enriched_records
> SELECT
>    o.id,
>    o.user_id,
>    o.create_time,
>    o.proc_time,
>    o.operation_time
> FROM records AS o; 
> {code}
> We query the data in Elasticsearch use GET /enriched_records/_search and we find that each record has an ingest_timestamp field and the value is the recent time.
> {code:json}
> {
>     "_index":"enriched_records",
>     "_type":"_doc",
>     "_id":"3",
>     "_score":1,
>     "_source":{
>         "operation_time":"1970-01-01 00:00:00Z",
>         "create_time":"2023-01-20 12:25:37",
>         "user_id":789,
>         "ingest_timestamp":"2023-01-28T05:21:40.539754251Z",
>         "id":3,
>         "proc_time":"2023-01-28 05:21:40.233Z"
>     }
> } {code}
> When we modify a record in MySQL, the value of the ingest_timestamp field does not change, and it seems that the pipeline set for this index is not working at this moment.
> {code:json}
> {
>     "_index":"enriched_records",
>     "_type":"_doc",
>     "_id":"3",
>     "_score":1,
>     "_source":{
>         "operation_time":"2023-01-28 05:25:05Z",
>         "create_time":"2023-01-20 12:25:37",
>         "user_id":987,
>         "ingest_timestamp":"2023-01-28T05:21:40.539754251Z",
>         "id":3,
>         "proc_time":"2023-01-28 05:25:05.529Z"
>     }
> }
> {code}
> If we directly modify a field in Elasticsearch, we can find that the value of the ingest_timestamp field will change.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)