You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "xingyuan cheng (Jira)" <ji...@apache.org> on 2020/04/21 11:54:00 UTC

[jira] [Created] (FLINK-17304) Kafka two streams cannot use Flink SQL to query inner join

xingyuan cheng created FLINK-17304:
--------------------------------------

             Summary: Kafka two streams cannot use Flink SQL to query inner join
                 Key: FLINK-17304
                 URL: https://issues.apache.org/jira/browse/FLINK-17304
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream, Table SQL / API
    Affects Versions: 1.9.0
         Environment: flink.version=1.9.0

scala.binary.version=2.11
            Reporter: xingyuan cheng


In my work, I found that when subscribing datastream from two different topics of Kafka, the operator operations of the two streams can be executed respectively, but in the end, I did not query the inner join through Flink SQL as expected. What do I need to do to make it work?

TestStreamSQL.java

```

public class TestStreamSQL {
 private static Logger log = LoggerFactory.getLogger(BinlogStreamSQL.class);
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.enableCheckpointing(1000);
 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
 env.getCheckpointConfig().setCheckpointTimeout(60000);
 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// env.setStateBackend(new FsStateBackend("hdfs://ido001:8020/user/lwj/flink/checkpoint"));

 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

 StreamQueryConfig queryConfig = new StreamQueryConfig();
 queryConfig.withIdleStateRetentionTime(Time.days(10), Time.days(30));

 Properties properties = new Properties();
 properties.setProperty("bootstrap.servers", "ido001.gzcb.com:9092,ido002.gzcb.com:9092,ido003.gzcb.com:9092");
 properties.setProperty("group.id", "flink");
 String topic_1 = "bps-16-r3p3";
 String topic_2 = "bps-16-r3p4";

 DataStreamSource<String> topic1 = env.addSource(new FlinkKafkaConsumer010<String>(topic_1, new SimpleStringSchema(), properties));
 SingleOutputStreamOperator<Tuple3<String, String, String>> kafkaSource1 = topic1.filter(new FilterFunction<String>() {
 @Override
 public boolean filter(String value) throws Exception {
 try {
 BinLogBean binLogBean = JSONObject.parseObject(value, BinLogBean.class);
 if ("app_case".equals(binLogBean.getTableName())){
 return true;
 }else {
 return false;
 }
 }catch (Exception e){
 log.error("JSON转换失败,str={}", value, e);
 return false;
 }
 }
 }).map(new MapFunction<String, Tuple3<String, String, String>>() {
 @Override
 public Tuple3<String, String, String> map(String s) throws Exception {
 BinLogBean binLogBean = JSONObject.parseObject(s, BinLogBean.class);
 String case_id = BinLogUtil.getValueByField(binLogBean, "case_id");
 String close_time = BinLogUtil.getValueByField(binLogBean, "close_time");
 String approve_result = BinLogUtil.getValueByField(binLogBean, "approve_result");
 return new Tuple3<String, String, String>(case_id, close_time, approve_result);
 }
 });
 tEnv.registerDataStream("app_case", kafkaSource1, "case_id, close_time, approve_result");

 DataStreamSource<String> topic2 = env.addSource(new FlinkKafkaConsumer010<String>(topic_2, new SimpleStringSchema(), properties));
 SingleOutputStreamOperator<Tuple2<String, String>> kafkaSource2 = topic2.filter(new FilterFunction<String>() {
 @Override
 public boolean filter(String value) throws Exception {
 try {
 BinLogBean binLogBean = JSONObject.parseObject(value, BinLogBean.class);
 if ("cm_customer".equals(binLogBean.getTableName())){
 return true;
 }else {
 return false;
 }
 }catch (Exception e){
 log.error("JSON转换失败,str={}", value, e);
 return false;
 }
 }
 }).map(new MapFunction<String, Tuple2<String, String>>() {
 @Override
 public Tuple2<String, String> map(String s) throws Exception {
 BinLogBean binLogBean = JSONObject.parseObject(s, BinLogBean.class);
 String case_id = BinLogUtil.getValueByField(binLogBean, "case_id");
 String idtfno = BinLogUtil.getValueByField(binLogBean, "idtfno");
 return new Tuple2<String, String>(case_id, idtfno);
 }
 });
 tEnv.registerDataStream("cm_customer", kafkaSource2, "case_id, idtfno");

 Table result = tEnv.sqlQuery("select a.*,b.idtfno " +
 "from app_case a left join cm_customer b on a.case_id = b.case_id " +
 "where a.close_time not in('')");

 tEnv.toRetractStream(result, Row.class, queryConfig).filter(new FilterFunction<Tuple2<Boolean, Row>>() {
 @Override
 public boolean filter(Tuple2<Boolean, Row> booleanRowTuple2) throws Exception {
 return booleanRowTuple2.f0;
 }
 }).print();

 env.execute();
 }
}

```

 

BinLogBean.java

```

public class BinLogBean implements Serializable{
 private String instance;
 private int version;
 private Long serverId;

 private String executeTime;
 private String logfileName;
 private Long logfileOffset;
 /**
 * database name
 */
 private String schemaName;
 private String tableName;
 private String eventType;
 private List<ColumnField> columnFieldsList;


 public String getInstance() {
 return instance;
 }

 public void setInstance(String instance) {
 this.instance = instance;
 }

 public List<ColumnField> getColumnFieldsList() {
 return columnFieldsList;
 }

 public void setColumnFieldsList(List<ColumnField> columnFieldsList) {
 this.columnFieldsList = columnFieldsList;
 }

 public int getVersion() {
 return version;
 }

 public void setVersion(int version) {
 this.version = version;
 }

 public Long getServerId() {
 return serverId;
 }

 public void setServerId(Long serverId) {
 this.serverId = serverId;
 }

 public String getExecuteTime() {
 return executeTime;
 }

 public void setExecuteTime(String executeTime) {
 this.executeTime = executeTime;
 }

 public String getLogfileName() {
 return logfileName;
 }

 public void setLogfileName(String logfileName) {
 this.logfileName = logfileName;
 }

 public Long getLogfileOffset() {
 return logfileOffset;
 }

 public void setLogfileOffset(Long logfileOffset) {
 this.logfileOffset = logfileOffset;
 }

 public String getSchemaName() {
 return schemaName;
 }

 public void setSchemaName(String schemaName) {
 this.schemaName = schemaName;
 }

 public String getTableName() {
 return tableName;
 }

 public void setTableName(String tableName) {
 this.tableName = tableName;
 }

 public String getEventType() {
 return eventType;
 }

 public void setEventType(String eventType) {
 this.eventType = eventType;
 }

}

```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)