You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2023/03/30 12:57:00 UTC
[jira] [Commented] (FLINK-30809) flink-connector-elasticsearch7 updates data pipeline does not work
[ https://issues.apache.org/jira/browse/FLINK-30809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17706868#comment-17706868 ]
Martijn Visser commented on FLINK-30809:
----------------------------------------
How have you determined that the problem is the Elasticsearch sink, and not the MySQL CDC source? Is updated record in MySQL actually visible in your test?
> flink-connector-elasticsearch7 updates data pipeline does not work
> ------------------------------------------------------------------
>
> Key: FLINK-30809
> URL: https://issues.apache.org/jira/browse/FLINK-30809
> Project: Flink
> Issue Type: Bug
> Components: Connectors / ElasticSearch
> Affects Versions: elasticsearch-3.0.0
> Environment: Flink Version: 1.15.3
> Flink-CDC Version: 2.3.0
> Mysql Version: 5.7
> Elasticsearch Version: 7.17.7
> During the test, these jar packages were added under flink/lib:
> flink-sql-connector-elasticsearch7-1.15.3.jar
> flink-sql-connector-mysql-cdc-2.3.0.jar
> mysql-connector-java-8.0.27.jar
> Reporter: iduanyingjie
> Priority: Major
>
> create elasticsearch in docker
> {code:yaml}
> version: '2.1'
> services:
> elasticsearch:
> image: docker.elastic.co/elasticsearch/elasticsearch:7.17.7
> environment:
> - cluster.name=docker-cluster
> - bootstrap.memory_lock=true
> - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
> - discovery.type=single-node
> ports:
> - "9200:9200"
> - "9300:9300"
> ulimits:
> memlock:
> soft: -1
> hard: -1
> nofile:
> soft: 65536
> hard: 65536
> kibana:
> image: docker.elastic.co/kibana/kibana:7.17.7
> ports:
> - "5601:5601"
> {code}
> create table: records in mysql
> {code:sql}
> CREATE TABLE records (
> id bigint unsigned NOT NULL AUTO_INCREMENT,
> user_id bigint unsigned NOT NULL,
> create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
> PRIMARY KEY (id)
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
> {code}
> insert some datas
> {code:sql}
> INSERT INTO test.records (id, user_id, create_time) VALUES(default, 123, '2023-01-20 12:25:11');
> INSERT INTO test.records (id, user_id, create_time) VALUES(default, 456, '2023-01-20 12:25:30');
> INSERT INTO test.records (id, user_id, create_time) VALUES(default, 789, '2023-01-20 12:25:37');
> {code}
> create pipeline in es:
> {code:java}
> PUT /_ingest/pipeline/set_ingest_timestamp_fields
> {
> "processors": [
> {
> "set": {
> "field": "ingest_timestamp",
> "value": "{{_ingest.timestamp}}"
> }
> }
> ]
> }{code}
> create index in es:
> {code:java}
> PUT enriched_records
> {
> "settings": {
> "default_pipeline": "set_ingest_timestamp_fields",
> "number_of_shards": "1",
> "number_of_replicas": "0"
> }
> }{code}
> excute flink sql:
> {code:sql}
> CREATE TABLE records (
> id INT,
> user_id INT,
> create_time TIMESTAMP(3),
> proc_time AS PROCTIME(),
> operation_time TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = 'localhost',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '123456',
> 'database-name' = 'test',
> 'table-name' = 'records',
> 'server-time-zone' = 'UTC'
> );
> CREATE TABLE enriched_records (
> id INT,
> user_id INT,
> create_time TIMESTAMP(3),
> proc_time TIMESTAMP_LTZ(3),
> operation_time TIMESTAMP_LTZ(3),
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'elasticsearch-7',
> 'hosts' = 'http://localhost:9200',
> 'index' = 'enriched_records'
> );
> INSERT INTO enriched_records
> SELECT
> o.id,
> o.user_id,
> o.create_time,
> o.proc_time,
> o.operation_time
> FROM records AS o;
> {code}
> We query the data in Elasticsearch use GET /enriched_records/_search and we find that each record has an ingest_timestamp field and the value is the recent time.
> {code:json}
> {
> "_index":"enriched_records",
> "_type":"_doc",
> "_id":"3",
> "_score":1,
> "_source":{
> "operation_time":"1970-01-01 00:00:00Z",
> "create_time":"2023-01-20 12:25:37",
> "user_id":789,
> "ingest_timestamp":"2023-01-28T05:21:40.539754251Z",
> "id":3,
> "proc_time":"2023-01-28 05:21:40.233Z"
> }
> } {code}
> When we modify a record in MySQL, the value of the ingest_timestamp field does not change, and it seems that the pipeline set for this index is not working at this moment.
> {code:json}
> {
> "_index":"enriched_records",
> "_type":"_doc",
> "_id":"3",
> "_score":1,
> "_source":{
> "operation_time":"2023-01-28 05:25:05Z",
> "create_time":"2023-01-20 12:25:37",
> "user_id":987,
> "ingest_timestamp":"2023-01-28T05:21:40.539754251Z",
> "id":3,
> "proc_time":"2023-01-28 05:25:05.529Z"
> }
> }
> {code}
> If we directly modify a field in Elasticsearch, we can find that the value of the ingest_timestamp field will change.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)