You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2020/04/21 12:37:00 UTC

[jira] [Commented] (FLINK-17304) Kafka two streams cannot use Flink SQL to query inner join

    [ https://issues.apache.org/jira/browse/FLINK-17304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17088637#comment-17088637 ] 

Jark Wu commented on FLINK-17304:
---------------------------------

What the problem do you encountered?  If there is an exception, you can post the exception stack in the description. So that we can look into it. 

> Kafka two streams cannot use Flink SQL to query inner join
> ----------------------------------------------------------
>
>                 Key: FLINK-17304
>                 URL: https://issues.apache.org/jira/browse/FLINK-17304
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, Table SQL / API
>    Affects Versions: 1.9.0
>         Environment: flink.version=1.9.0
> scala.binary.version=2.11
>            Reporter: xingyuan cheng
>            Priority: Major
>
> In my work, I found that when subscribing datastream from two different topics of Kafka, the operator operations of the two streams can be executed respectively, but in the end, I did not query the inner join through Flink SQL as expected. What do I need to do to make it work?
> TestStreamSQL.java
> ```
> public class TestStreamSQL {
>  private static Logger log = LoggerFactory.getLogger(BinlogStreamSQL.class);
>  public static void main(String[] args) throws Exception {
>  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>  env.enableCheckpointing(1000);
>  env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>  env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
>  env.getCheckpointConfig().setCheckpointTimeout(60000);
>  env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>  env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> // env.setStateBackend(new FsStateBackend("hdfs://ido001:8020/user/lwj/flink/checkpoint"));
>  StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>  StreamQueryConfig queryConfig = new StreamQueryConfig();
>  queryConfig.withIdleStateRetentionTime(Time.days(10), Time.days(30));
>  Properties properties = new Properties();
>  properties.setProperty("bootstrap.servers", "ido001.gzcb.com:9092,ido002.gzcb.com:9092,ido003.gzcb.com:9092");
>  properties.setProperty("group.id", "flink");
>  String topic_1 = "bps-16-r3p3";
>  String topic_2 = "bps-16-r3p4";
>  DataStreamSource<String> topic1 = env.addSource(new FlinkKafkaConsumer010<String>(topic_1, new SimpleStringSchema(), properties));
>  SingleOutputStreamOperator<Tuple3<String, String, String>> kafkaSource1 = topic1.filter(new FilterFunction<String>() {
>  @Override
>  public boolean filter(String value) throws Exception {
>  try {
>  BinLogBean binLogBean = JSONObject.parseObject(value, BinLogBean.class);
>  if ("app_case".equals(binLogBean.getTableName())){
>  return true;
>  }else {
>  return false;
>  }
>  }catch (Exception e){
>  log.error("JSON转换失败,str={}", value, e);
>  return false;
>  }
>  }
>  }).map(new MapFunction<String, Tuple3<String, String, String>>() {
>  @Override
>  public Tuple3<String, String, String> map(String s) throws Exception {
>  BinLogBean binLogBean = JSONObject.parseObject(s, BinLogBean.class);
>  String case_id = BinLogUtil.getValueByField(binLogBean, "case_id");
>  String close_time = BinLogUtil.getValueByField(binLogBean, "close_time");
>  String approve_result = BinLogUtil.getValueByField(binLogBean, "approve_result");
>  return new Tuple3<String, String, String>(case_id, close_time, approve_result);
>  }
>  });
>  tEnv.registerDataStream("app_case", kafkaSource1, "case_id, close_time, approve_result");
>  DataStreamSource<String> topic2 = env.addSource(new FlinkKafkaConsumer010<String>(topic_2, new SimpleStringSchema(), properties));
>  SingleOutputStreamOperator<Tuple2<String, String>> kafkaSource2 = topic2.filter(new FilterFunction<String>() {
>  @Override
>  public boolean filter(String value) throws Exception {
>  try {
>  BinLogBean binLogBean = JSONObject.parseObject(value, BinLogBean.class);
>  if ("cm_customer".equals(binLogBean.getTableName())){
>  return true;
>  }else {
>  return false;
>  }
>  }catch (Exception e){
>  log.error("JSON转换失败,str={}", value, e);
>  return false;
>  }
>  }
>  }).map(new MapFunction<String, Tuple2<String, String>>() {
>  @Override
>  public Tuple2<String, String> map(String s) throws Exception {
>  BinLogBean binLogBean = JSONObject.parseObject(s, BinLogBean.class);
>  String case_id = BinLogUtil.getValueByField(binLogBean, "case_id");
>  String idtfno = BinLogUtil.getValueByField(binLogBean, "idtfno");
>  return new Tuple2<String, String>(case_id, idtfno);
>  }
>  });
>  tEnv.registerDataStream("cm_customer", kafkaSource2, "case_id, idtfno");
>  Table result = tEnv.sqlQuery("select a.*,b.idtfno " +
>  "from app_case a left join cm_customer b on a.case_id = b.case_id " +
>  "where a.close_time not in('')");
>  tEnv.toRetractStream(result, Row.class, queryConfig).filter(new FilterFunction<Tuple2<Boolean, Row>>() {
>  @Override
>  public boolean filter(Tuple2<Boolean, Row> booleanRowTuple2) throws Exception {
>  return booleanRowTuple2.f0;
>  }
>  }).print();
>  env.execute();
>  }
> }
> ```
>  
> BinLogBean.java
> ```
> public class BinLogBean implements Serializable{
>  private String instance;
>  private int version;
>  private Long serverId;
>  private String executeTime;
>  private String logfileName;
>  private Long logfileOffset;
>  /**
>  * database name
>  */
>  private String schemaName;
>  private String tableName;
>  private String eventType;
>  private List<ColumnField> columnFieldsList;
>  public String getInstance() {
>  return instance;
>  }
>  public void setInstance(String instance) {
>  this.instance = instance;
>  }
>  public List<ColumnField> getColumnFieldsList() {
>  return columnFieldsList;
>  }
>  public void setColumnFieldsList(List<ColumnField> columnFieldsList) {
>  this.columnFieldsList = columnFieldsList;
>  }
>  public int getVersion() {
>  return version;
>  }
>  public void setVersion(int version) {
>  this.version = version;
>  }
>  public Long getServerId() {
>  return serverId;
>  }
>  public void setServerId(Long serverId) {
>  this.serverId = serverId;
>  }
>  public String getExecuteTime() {
>  return executeTime;
>  }
>  public void setExecuteTime(String executeTime) {
>  this.executeTime = executeTime;
>  }
>  public String getLogfileName() {
>  return logfileName;
>  }
>  public void setLogfileName(String logfileName) {
>  this.logfileName = logfileName;
>  }
>  public Long getLogfileOffset() {
>  return logfileOffset;
>  }
>  public void setLogfileOffset(Long logfileOffset) {
>  this.logfileOffset = logfileOffset;
>  }
>  public String getSchemaName() {
>  return schemaName;
>  }
>  public void setSchemaName(String schemaName) {
>  this.schemaName = schemaName;
>  }
>  public String getTableName() {
>  return tableName;
>  }
>  public void setTableName(String tableName) {
>  this.tableName = tableName;
>  }
>  public String getEventType() {
>  return eventType;
>  }
>  public void setEventType(String eventType) {
>  this.eventType = eventType;
>  }
> }
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)