You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/01/07 07:35:00 UTC
[jira] [Commented] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode
[ https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17470393#comment-17470393 ]
Martijn Visser commented on FLINK-25560:
----------------------------------------
[~jingge] Will you take a look at this one?
> Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode
> -----------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-25560
> URL: https://issues.apache.org/jira/browse/FLINK-25560
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / HBase
> Affects Versions: 1.15.0
> Reporter: Bruce Wong
> Priority: Major
> Labels: pull-request-available
>
> h1. Motivation
> When we synchronize data from mysql to HBase, we find that when deleting data from mysql, HBase cannot delete all versions, which leads to incorrect semantics. So we want to add a parameter to control deleting the latest version or deleting all versions.
> h1. Usage
> The test code is as follows.
> {code:java}
> package com.bruce;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.TableConfig;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;
> public class KafkaToHBase {
> public static void main(String[] args) {
> Configuration cfg = new Configuration();
> cfg.setBoolean(LOCAL_START_WEBSERVER, true);
> StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
> env.setParallelism(1);
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings);
> // TableConfig config = tEnv.getConfig();
> // config.setIdleStateRetention(Duration.ofHours(2));
> String source = "CREATE TEMPORARY TABLE IF NOT EXISTS kafka_llspay_bundles(\n" +
> " id STRING,\n" +
> " category_id STRING,\n" +
> " upc STRING,\n" +
> " `name` STRING,\n" +
> " price_cents STRING,\n" +
> " original_price_cents STRING,\n" +
> " short_desc STRING,\n" +
> " desc STRING,\n" +
> " cover_url STRING,\n" +
> " created_at STRING,\n" +
> " updated_at STRING,\n" +
> " deleted_at STRING,\n" +
> " extra STRING,\n" +
> " status STRING,\n" +
> " scholarship_cents STRING,\n" +
> " is_payback STRING,\n" +
> " is_support_iap STRING,\n" +
> " iap_product_id STRING,\n" +
> " neo_product_code STRING,\n" +
> " paid_redirect_url STRING,\n" +
> " subscription_type STRING\n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = 'dim-bundles',\n" +
> " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
> " 'properties.group.id' = 'vvp_dev',\n" +
> " 'scan.startup.mode' = 'latest-offset',\n" +
> " 'value.debezium-json.schema-include' = 'true',\n" +
> " 'value.format' = 'debezium-json',\n" +
> " 'value.debezium-json.ignore-parse-errors' = 'true'\n" +
> ")";
> String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
> " rowkey STRING,\n" +
> " cf ROW<id STRING, category_id STRING, upc STRING, `name` STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, is_payback STRING, is_support_iap STRING, iap_product_id STRING, neo_product_code STRING, paid_redirect_url STRING, subscription_type STRING>\n" +
> ") with (\n" +
> " 'connector'='hbase-2.2',\n" +
> " 'table-name'='dim_hbase',\n" +
> " 'zookeeper.quorum'='localhost:2181',\n" +
> " 'sink.buffer-flush.max-size' = '0',\n" +
> " 'sink.buffer-flush.max-rows' = '1',\n" +
> " 'sink.delete.mode' = 'all-versions'\n" +
> ")";
> String dml = "INSERT INTO dim_hbase\n" +
> "SELECT \n" +
> " upc as rowkey,\n" +
> " ROW(\n" +
> " id, category_id, upc, `name`, price_cents, original_price_cents, short_desc, `desc` , cover_url , created_at, updated_at, deleted_at, extra , status , scholarship_cents , is_payback , is_support_iap , iap_product_id , neo_product_code , paid_redirect_url , subscription_type)\n" +
> "FROM kafka_llspay_bundles";
> tEnv.executeSql(source);
> tEnv.executeSql(sink);
> tEnv.executeSql(dml);
> }
> } {code}
> h1. Reference
> Please look at the following task link.
> FLINK-25330
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)