You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "xingyuan cheng (Jira)" <ji...@apache.org> on 2020/04/21 11:54:00 UTC
[jira] [Created] (FLINK-17304) Kafka two streams cannot use Flink
SQL to query inner join
xingyuan cheng created FLINK-17304:
--------------------------------------
Summary: Kafka two streams cannot use Flink SQL to query inner join
Key: FLINK-17304
URL: https://issues.apache.org/jira/browse/FLINK-17304
Project: Flink
Issue Type: Bug
Components: API / DataStream, Table SQL / API
Affects Versions: 1.9.0
Environment: flink.version=1.9.0
scala.binary.version=2.11
Reporter: xingyuan cheng
In my work, I found that when subscribing datastream from two different topics of Kafka, the operator operations of the two streams can be executed respectively, but in the end, I did not query the inner join through Flink SQL as expected. What do I need to do to make it work?
TestStreamSQL.java
```
public class TestStreamSQL {
private static Logger log = LoggerFactory.getLogger(BinlogStreamSQL.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// env.setStateBackend(new FsStateBackend("hdfs://ido001:8020/user/lwj/flink/checkpoint"));
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
StreamQueryConfig queryConfig = new StreamQueryConfig();
queryConfig.withIdleStateRetentionTime(Time.days(10), Time.days(30));
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "ido001.gzcb.com:9092,ido002.gzcb.com:9092,ido003.gzcb.com:9092");
properties.setProperty("group.id", "flink");
String topic_1 = "bps-16-r3p3";
String topic_2 = "bps-16-r3p4";
DataStreamSource<String> topic1 = env.addSource(new FlinkKafkaConsumer010<String>(topic_1, new SimpleStringSchema(), properties));
SingleOutputStreamOperator<Tuple3<String, String, String>> kafkaSource1 = topic1.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
try {
BinLogBean binLogBean = JSONObject.parseObject(value, BinLogBean.class);
if ("app_case".equals(binLogBean.getTableName())){
return true;
}else {
return false;
}
}catch (Exception e){
log.error("JSON转换失败,str={}", value, e);
return false;
}
}
}).map(new MapFunction<String, Tuple3<String, String, String>>() {
@Override
public Tuple3<String, String, String> map(String s) throws Exception {
BinLogBean binLogBean = JSONObject.parseObject(s, BinLogBean.class);
String case_id = BinLogUtil.getValueByField(binLogBean, "case_id");
String close_time = BinLogUtil.getValueByField(binLogBean, "close_time");
String approve_result = BinLogUtil.getValueByField(binLogBean, "approve_result");
return new Tuple3<String, String, String>(case_id, close_time, approve_result);
}
});
tEnv.registerDataStream("app_case", kafkaSource1, "case_id, close_time, approve_result");
DataStreamSource<String> topic2 = env.addSource(new FlinkKafkaConsumer010<String>(topic_2, new SimpleStringSchema(), properties));
SingleOutputStreamOperator<Tuple2<String, String>> kafkaSource2 = topic2.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
try {
BinLogBean binLogBean = JSONObject.parseObject(value, BinLogBean.class);
if ("cm_customer".equals(binLogBean.getTableName())){
return true;
}else {
return false;
}
}catch (Exception e){
log.error("JSON转换失败,str={}", value, e);
return false;
}
}
}).map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String s) throws Exception {
BinLogBean binLogBean = JSONObject.parseObject(s, BinLogBean.class);
String case_id = BinLogUtil.getValueByField(binLogBean, "case_id");
String idtfno = BinLogUtil.getValueByField(binLogBean, "idtfno");
return new Tuple2<String, String>(case_id, idtfno);
}
});
tEnv.registerDataStream("cm_customer", kafkaSource2, "case_id, idtfno");
Table result = tEnv.sqlQuery("select a.*,b.idtfno " +
"from app_case a left join cm_customer b on a.case_id = b.case_id " +
"where a.close_time not in('')");
tEnv.toRetractStream(result, Row.class, queryConfig).filter(new FilterFunction<Tuple2<Boolean, Row>>() {
@Override
public boolean filter(Tuple2<Boolean, Row> booleanRowTuple2) throws Exception {
return booleanRowTuple2.f0;
}
}).print();
env.execute();
}
}
```
BinLogBean.java
```
public class BinLogBean implements Serializable{
private String instance;
private int version;
private Long serverId;
private String executeTime;
private String logfileName;
private Long logfileOffset;
/**
* database name
*/
private String schemaName;
private String tableName;
private String eventType;
private List<ColumnField> columnFieldsList;
public String getInstance() {
return instance;
}
public void setInstance(String instance) {
this.instance = instance;
}
public List<ColumnField> getColumnFieldsList() {
return columnFieldsList;
}
public void setColumnFieldsList(List<ColumnField> columnFieldsList) {
this.columnFieldsList = columnFieldsList;
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public Long getServerId() {
return serverId;
}
public void setServerId(Long serverId) {
this.serverId = serverId;
}
public String getExecuteTime() {
return executeTime;
}
public void setExecuteTime(String executeTime) {
this.executeTime = executeTime;
}
public String getLogfileName() {
return logfileName;
}
public void setLogfileName(String logfileName) {
this.logfileName = logfileName;
}
public Long getLogfileOffset() {
return logfileOffset;
}
public void setLogfileOffset(Long logfileOffset) {
this.logfileOffset = logfileOffset;
}
public String getSchemaName() {
return schemaName;
}
public void setSchemaName(String schemaName) {
this.schemaName = schemaName;
}
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public String getEventType() {
return eventType;
}
public void setEventType(String eventType) {
this.eventType = eventType;
}
}
```
--
This message was sent by Atlassian Jira
(v8.3.4#803005)