You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 刘海 <li...@163.com> on 2021/01/20 02:27:34 UTC

flinksql 将计算结果写入到hbase数据不一致

Hi All !
flink1.12,本地idea运行以下代码结果正确,但是将代码打包成jar发布到yarn上运行结果就不正确了,数据少了本来应该有325条结果只有一百多条,希望各位给点排查建议
以下是代码:
public class FlinkTestDemo {
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
CheckpointConfig config = env.getCheckpointConfig();
//ExecutionConfig executionConfig = env.getConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Configuration configuration = bsTableEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "6000");
configuration.setString("table.exec.mini-batch.size", "5000");

StatementSet stmtSet = bsTableEnv.createStatementSet();//多条insert操作需要使用StatementSet对象来执行

bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = '10.0.15.83',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = 'Cdh2020:1',\n" +
" 'database-name' = 'flink-test',\n" +
" 'table-name' = 'tm_dealers'\n" +
")");

/**********************************************集客管理指标-start************************************************************
         * 网销线索表(mysql-cdc数据源)
*/
bsTableEnv.executeSql("CREATE TABLE new_clue_list_cdc (\n" +
"entity_code STRING,data_source STRING,clue_level STRING,customer_no STRING,fail_apply_time timestamp,clue_fail_type INT,\n" +
"PRIMARY KEY (customer_no) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = '10.0.17.224',\n" +
" 'port' = '3306',\n" +
" 'username' = 'liuhai',\n" +
" 'password' = 'liuhai_2020',\n" +
" 'database-name' = 'wk_mall_service',\n" +
" 'table-name' = 'new_clue_list'\n" +
")");

/**
         * 汇总层Hbase表 轻度汇总中间层-网销数据域-网销线索按来源渠道维度无效线索汇总表
*/
bsTableEnv.executeSql("CREATE TABLE DWM_NETSALES_WXXSALYQDWDWXXSHZB_SS_I_HBASE (" +
"ROWKEY VARCHAR," +
"F1 ROW(ENTITY_CODE VARCHAR," +
"ZWXXS VARCHAR," +
"QCZJWXZBXS VARCHAR," +
"YCWXZBXS VARCHAR," +
"TPYWXZBXS VARCHAR," +
"AKWXZBXS VARCHAR," +
"DCDWXZBXS VARCHAR," +
"SUM_TIME VARCHAR)," +
"PRIMARY KEY (ROWKEY) NOT ENFORCED" +
") WITH (" +
"    'connector' = 'hbase-2.2'," +
"    'table-name' = 'DWM:DWM_NETSALES_WXXSALYQDWDWXXSHZB_SS_I_HBASE'," +
"    'zookeeper.quorum' = '10.0.15.83:2181'," +
"    'zookeeper.znode.parent' = '/hbase'" +
// "    'sink.buffer-flush.max-size' = '3mb'," +
               // "    'sink.buffer-flush.max-rows' = '1000'," +
               // "    'sink.buffer-flush.interval' = '3s'" +
")");


stmtSet.addInsertSql("INSERT INTO DWM_NETSALES_WXXSALYQDWDWXXSHZB_SS_I_HBASE "+
"SELECT " +
"ROWKEY,ROW(dealer_code,ZWXXS,QCZJWXZBXS,YCWXZBXS,TPYWXZBXS,AKWXZBXS,DCDWXZBXS,SUM_TIME) as F1 " +
"FROM " +
"(SELECT CONCAT_WS('',SUBSTRING(MD5(t1.dealer_code) FROM 0  FOR 6 ),t1.dealer_code,FROM_UNIXTIME(UNIX_TIMESTAMP(),'YYYYMMDD')) AS ROWKEY," +
"t1.dealer_code," +
"IF(t2.ZWXXS IS NULL, '0',t2.ZWXXS) AS ZWXXS,"+
"IF(t2.QCZJWXZBXS IS NULL, '0',t2.QCZJWXZBXS) AS QCZJWXZBXS," +
"IF(t2.YCWXZBXS IS NULL, '0',t2.YCWXZBXS) AS YCWXZBXS," +
"IF(t2.TPYWXZBXS IS NULL, '0',t2.TPYWXZBXS) AS TPYWXZBXS," +
"IF(t2.AKWXZBXS IS NULL, '0',t2.AKWXZBXS) AS AKWXZBXS," +
"IF(t2.DCDWXZBXS IS NULL, '0',t2.DCDWXZBXS) AS DCDWXZBXS," +
"FROM_UNIXTIME(UNIX_TIMESTAMP()) AS SUM_TIME " +
"FROM " +
"tm_dealers AS t1 " +
"LEFT JOIN " +
"(SELECT " +
"C.entity_code, " +
"CAST(COUNT(1) AS STRING) AS ZWXXS," +
"CAST(COUNT(CASE WHEN C.data_source = '20' OR C.data_source = '100' THEN 1 END )  AS STRING) AS QCZJWXZBXS," +
"CAST(COUNT( CASE WHEN C.data_source = '40' OR C.data_source = '30' THEN 1 END )  AS STRING) AS YCWXZBXS," +
"CAST(COUNT( CASE WHEN C.data_source = '10' OR C.data_source = '80' THEN 1 END )  AS STRING) AS TPYWXZBXS," +
"CAST(COUNT( CASE WHEN C.data_source = '60' OR C.data_source = '50' THEN 1 END )  AS STRING) AS AKWXZBXS," +
"CAST(COUNT( CASE WHEN C.data_source = '130' OR C.data_source = '140' THEN 1 END )  AS STRING) AS DCDWXZBXS " +
"FROM " +
"new_clue_list_cdc AS C " +
"WHERE " +
"C.fail_apply_time IS NOT NULL AND C.clue_fail_type = 20 GROUP BY C.entity_code) AS t2 " +
"ON t1.dealer_code = t2.entity_code  WHERE t1.is_valid=12781001)");

/**
         * 汇总层Hbase表 网销线索按来源渠道维度汇总表
*/
bsTableEnv.executeSql("CREATE TABLE DWM_NETSALES_WXXSALYQDWDHZB_SS_I_HBASE (" +
"ROWKEY VARCHAR," +
"F1 ROW(ENTITY_CODE VARCHAR,ZXS VARCHAR, QCZJXS VARCHAR, QCZJSBXS VARCHAR, QCZJZXS VARCHAR,  YCXS VARCHAR," +
" YCSBXS VARCHAR, YCZXS VARCHAR, TPYXS VARCHAR, TPYSBXS VARCHAR, TPYZXS VARCHAR,  AKXS VARCHAR, AKSBXS VARCHAR," +
" AKZXS VARCHAR,  DCDXS VARCHAR, DCDSBXS VARCHAR, DCDZXS VARCHAR, SUM_TIME VARCHAR)," +
"PRIMARY KEY (ROWKEY) NOT ENFORCED " +
") WITH (" +
"    'connector' = 'hbase-2.2'," +
"    'table-name' = 'DWM:DWM_NETSALES_WXXSALYQDWDHZB_SS_I_HBASE'," +
"    'zookeeper.quorum' = '10.0.15.83:2181'," +
"    'zookeeper.znode.parent' = '/hbase'" +
// "    'sink.buffer-flush.max-size' = '3mb',"+
               // "    'sink.buffer-flush.max-rows' = '1000',"+
               // "    'sink.buffer-flush.interval' = '3s'"+
")");

stmtSet.addInsertSql(
"INSERT INTO DWM_NETSALES_WXXSALYQDWDHZB_SS_I_HBASE " +
"SELECT " +
"ROWKEY,ROW(dealer_code,ZXS,QCZJXS,QCZJSBXS,QCZJZXS,YCXS,YCSBXS,YCZXS,TPYXS,TPYSBXS," +
"TPYZXS,AKXS,AKSBXS,AKZXS,DCDXS,DCDSBXS,DCDZXS,SUM_TIME) as F1 " +
"FROM " +
"(SELECT CONCAT_WS('',SUBSTRING(MD5(t1.dealer_code) FROM 0  FOR 6 ),t1.dealer_code,FROM_UNIXTIME(UNIX_TIMESTAMP(),'YYYYMMDD')) AS ROWKEY," +
"t1.dealer_code," +
"IF(t2.ZXS IS NULL, '0',t2.ZXS) AS ZXS,"+
"IF(t2.QCZJXS IS NULL, '0',t2.QCZJXS) AS QCZJXS,"+
"IF(t2.QCZJSBXS IS NULL, '0',t2.QCZJSBXS) AS QCZJSBXS,"+
"IF(t2.QCZJZXS IS NULL, '0',t2.QCZJZXS) AS QCZJZXS,"+
"IF(t2.YCXS IS NULL, '0',t2.YCXS) AS YCXS,"+
"IF(t2.YCSBXS IS NULL, '0',t2.YCSBXS) AS YCSBXS,"+
"IF(t2.YCZXS IS NULL, '0',t2.YCZXS) AS YCZXS,"+
"IF(t2.TPYXS IS NULL, '0',t2.TPYXS) AS TPYXS,"+
"IF(t2.TPYSBXS IS NULL, '0',t2.TPYSBXS) AS TPYSBXS,"+
"IF(t2.TPYZXS IS NULL, '0',t2.TPYZXS) AS TPYZXS,"+
"IF(t2.AKXS IS NULL, '0',t2.AKXS) AS AKXS,"+
"IF(t2.AKSBXS IS NULL, '0',t2.AKSBXS) AS AKSBXS,"+
"IF(t2.AKZXS IS NULL, '0',t2.AKZXS) AS AKZXS,"+
"IF(t2.DCDXS IS NULL, '0',t2.DCDXS) AS DCDXS,"+
"IF(t2.DCDSBXS IS NULL, '0',t2.DCDSBXS) AS DCDSBXS,"+
"IF(t2.DCDZXS IS NULL, '0',t2.DCDZXS) AS DCDZXS,"+
"FROM_UNIXTIME(UNIX_TIMESTAMP()) AS SUM_TIME " +
"FROM " +
"tm_dealers AS t1 " +
"LEFT JOIN " +
"(SELECT " +
"t.entity_code," +
"CAST(count(*) AS STRING) AS ZXS," +
"CAST(SUM(t.QCZJ_XS_LM) AS STRING) AS QCZJXS," +
"CAST(SUM(t.QCZJ_SB_LM) AS STRING) AS QCZJSBXS," +
"CAST(SUM(t.QCZJ_XS_LM)+SUM(t.QCZJ_SB_LM) AS STRING) AS QCZJZXS,"+
"CAST(SUM(t.YC_XS_LM) AS STRING) AS YCXS," +
"CAST(SUM(t.YC_SB_LM) AS STRING) AS YCSBXS," +
"CAST(SUM(t.YC_XS_LM)+SUM(t.YC_SB_LM) AS STRING) AS YCZXS,"+
"CAST(SUM(t.TPY_XS_LM) AS STRING) AS TPYXS," +
"CAST(SUM(t.TPY_SB_LM) AS STRING) AS TPYSBXS," +
"CAST(SUM(t.TPY_XS_LM)+SUM(t.TPY_SB_LM) AS STRING) AS TPYZXS,"+
"CAST(SUM(t.AK_XS_LM) AS STRING) AS AKXS," +
"CAST(SUM(t.AK_SB_LM) AS STRING) AS AKSBXS," +
"CAST(SUM(t.AK_XS_LM)+SUM(t.AK_SB_LM) AS STRING) AS AKZXS,"+
"CAST(SUM(t.DCD_XS_LM) AS STRING) AS DCDXS," +
"CAST(SUM(t.DCD_SB_LM) AS STRING) AS DCDSBXS," +
"CAST(SUM(t.DCD_XS_LM)+SUM(t.DCD_SB_LM) AS STRING) AS DCDZXS "+
"FROM " +
"(SELECT " +
"C.entity_code," +
"COUNT(*) AS NUM_2_LM," +
"COUNT( CASE WHEN C.data_source = '20' THEN 1 END ) AS QCZJ_XS_LM," +
"COUNT( CASE WHEN C.data_source = '100' THEN 1 END ) AS QCZJ_SB_LM," +
"COUNT( CASE WHEN C.data_source = '40' THEN 1 END ) AS YC_XS_LM," +
"COUNT( CASE WHEN C.data_source = '30' THEN 1 END ) AS YC_SB_LM," +
"COUNT( CASE WHEN C.data_source = '10' THEN 1 END ) AS TPY_XS_LM," +
"COUNT( CASE WHEN C.data_source = '80' THEN 1 END ) AS TPY_SB_LM," +
"COUNT( CASE WHEN C.data_source = '60' THEN 1 END ) AS AK_XS_LM," +
"COUNT( CASE WHEN C.data_source = '50' THEN 1 END ) AS AK_SB_LM," +
"COUNT( CASE WHEN C.data_source = '130' THEN 1 END ) AS DCD_XS_LM," +
"COUNT( CASE WHEN C.data_source = '140' THEN 1 END ) AS DCD_SB_LM  " +
"FROM " +
"new_clue_list_cdc AS C  " +
"WHERE " +
"((C.clue_level IN ('13101007','13101006')) OR C.clue_level IN ('13101001','13101002','13101003','13101004')) " +
"GROUP BY " +
"C.entity_code,C.customer_no " +
") AS t " +
"GROUP BY t.entity_code ) AS t2 ON t1.dealer_code = t2.entity_code  WHERE t1.is_valid=12781001)");

stmtSet.execute();

}

}
| |
刘海
|
|
liuhai35@163.com
|
签名由网易邮箱大师定制