You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2020/04/21 12:37:00 UTC
[jira] [Commented] (FLINK-17304) Kafka two streams cannot use Flink
SQL to query inner join
[ https://issues.apache.org/jira/browse/FLINK-17304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17088637#comment-17088637 ]
Jark Wu commented on FLINK-17304:
---------------------------------
What the problem do you encountered? If there is an exception, you can post the exception stack in the description. So that we can look into it.
> Kafka two streams cannot use Flink SQL to query inner join
> ----------------------------------------------------------
>
> Key: FLINK-17304
> URL: https://issues.apache.org/jira/browse/FLINK-17304
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream, Table SQL / API
> Affects Versions: 1.9.0
> Environment: flink.version=1.9.0
> scala.binary.version=2.11
> Reporter: xingyuan cheng
> Priority: Major
>
> In my work, I found that when subscribing datastream from two different topics of Kafka, the operator operations of the two streams can be executed respectively, but in the end, I did not query the inner join through Flink SQL as expected. What do I need to do to make it work?
> TestStreamSQL.java
> ```
> public class TestStreamSQL {
> private static Logger log = LoggerFactory.getLogger(BinlogStreamSQL.class);
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(1000);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> env.getCheckpointConfig().setCheckpointTimeout(60000);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> // env.setStateBackend(new FsStateBackend("hdfs://ido001:8020/user/lwj/flink/checkpoint"));
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> StreamQueryConfig queryConfig = new StreamQueryConfig();
> queryConfig.withIdleStateRetentionTime(Time.days(10), Time.days(30));
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "ido001.gzcb.com:9092,ido002.gzcb.com:9092,ido003.gzcb.com:9092");
> properties.setProperty("group.id", "flink");
> String topic_1 = "bps-16-r3p3";
> String topic_2 = "bps-16-r3p4";
> DataStreamSource<String> topic1 = env.addSource(new FlinkKafkaConsumer010<String>(topic_1, new SimpleStringSchema(), properties));
> SingleOutputStreamOperator<Tuple3<String, String, String>> kafkaSource1 = topic1.filter(new FilterFunction<String>() {
> @Override
> public boolean filter(String value) throws Exception {
> try {
> BinLogBean binLogBean = JSONObject.parseObject(value, BinLogBean.class);
> if ("app_case".equals(binLogBean.getTableName())){
> return true;
> }else {
> return false;
> }
> }catch (Exception e){
> log.error("JSON转换失败,str={}", value, e);
> return false;
> }
> }
> }).map(new MapFunction<String, Tuple3<String, String, String>>() {
> @Override
> public Tuple3<String, String, String> map(String s) throws Exception {
> BinLogBean binLogBean = JSONObject.parseObject(s, BinLogBean.class);
> String case_id = BinLogUtil.getValueByField(binLogBean, "case_id");
> String close_time = BinLogUtil.getValueByField(binLogBean, "close_time");
> String approve_result = BinLogUtil.getValueByField(binLogBean, "approve_result");
> return new Tuple3<String, String, String>(case_id, close_time, approve_result);
> }
> });
> tEnv.registerDataStream("app_case", kafkaSource1, "case_id, close_time, approve_result");
> DataStreamSource<String> topic2 = env.addSource(new FlinkKafkaConsumer010<String>(topic_2, new SimpleStringSchema(), properties));
> SingleOutputStreamOperator<Tuple2<String, String>> kafkaSource2 = topic2.filter(new FilterFunction<String>() {
> @Override
> public boolean filter(String value) throws Exception {
> try {
> BinLogBean binLogBean = JSONObject.parseObject(value, BinLogBean.class);
> if ("cm_customer".equals(binLogBean.getTableName())){
> return true;
> }else {
> return false;
> }
> }catch (Exception e){
> log.error("JSON转换失败,str={}", value, e);
> return false;
> }
> }
> }).map(new MapFunction<String, Tuple2<String, String>>() {
> @Override
> public Tuple2<String, String> map(String s) throws Exception {
> BinLogBean binLogBean = JSONObject.parseObject(s, BinLogBean.class);
> String case_id = BinLogUtil.getValueByField(binLogBean, "case_id");
> String idtfno = BinLogUtil.getValueByField(binLogBean, "idtfno");
> return new Tuple2<String, String>(case_id, idtfno);
> }
> });
> tEnv.registerDataStream("cm_customer", kafkaSource2, "case_id, idtfno");
> Table result = tEnv.sqlQuery("select a.*,b.idtfno " +
> "from app_case a left join cm_customer b on a.case_id = b.case_id " +
> "where a.close_time not in('')");
> tEnv.toRetractStream(result, Row.class, queryConfig).filter(new FilterFunction<Tuple2<Boolean, Row>>() {
> @Override
> public boolean filter(Tuple2<Boolean, Row> booleanRowTuple2) throws Exception {
> return booleanRowTuple2.f0;
> }
> }).print();
> env.execute();
> }
> }
> ```
>
> BinLogBean.java
> ```
> public class BinLogBean implements Serializable{
> private String instance;
> private int version;
> private Long serverId;
> private String executeTime;
> private String logfileName;
> private Long logfileOffset;
> /**
> * database name
> */
> private String schemaName;
> private String tableName;
> private String eventType;
> private List<ColumnField> columnFieldsList;
> public String getInstance() {
> return instance;
> }
> public void setInstance(String instance) {
> this.instance = instance;
> }
> public List<ColumnField> getColumnFieldsList() {
> return columnFieldsList;
> }
> public void setColumnFieldsList(List<ColumnField> columnFieldsList) {
> this.columnFieldsList = columnFieldsList;
> }
> public int getVersion() {
> return version;
> }
> public void setVersion(int version) {
> this.version = version;
> }
> public Long getServerId() {
> return serverId;
> }
> public void setServerId(Long serverId) {
> this.serverId = serverId;
> }
> public String getExecuteTime() {
> return executeTime;
> }
> public void setExecuteTime(String executeTime) {
> this.executeTime = executeTime;
> }
> public String getLogfileName() {
> return logfileName;
> }
> public void setLogfileName(String logfileName) {
> this.logfileName = logfileName;
> }
> public Long getLogfileOffset() {
> return logfileOffset;
> }
> public void setLogfileOffset(Long logfileOffset) {
> this.logfileOffset = logfileOffset;
> }
> public String getSchemaName() {
> return schemaName;
> }
> public void setSchemaName(String schemaName) {
> this.schemaName = schemaName;
> }
> public String getTableName() {
> return tableName;
> }
> public void setTableName(String tableName) {
> this.tableName = tableName;
> }
> public String getEventType() {
> return eventType;
> }
> public void setEventType(String eventType) {
> this.eventType = eventType;
> }
> }
> ```
--
This message was sent by Atlassian Jira
(v8.3.4#803005)