You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/04/22 11:46:38 UTC

[GitHub] [flink] lccbiluox2 commented on issue #2894: [FLINK-5184] fix bug: Error result of compareSerialized in RowComparator class

lccbiluox2 commented on issue #2894:
URL: https://github.com/apache/flink/pull/2894#issuecomment-617727722


   我有一个程序
   
   ```
   public class ControlEvent extends Row {
       private String id;
       private String name;
   
       /**
        * Create a new Row instance.
        *
        * @param arity The number of fields in the Row
        */
       public ControlEvent(int arity) {
           super(arity);
       }
   }
   ```
   大概是这样的
   ```
   public class FilterLabel extends CoProcessFunction<Row, ControlEvent,Row> {
   
       @Override
       public void processElement1(Row value, Context ctx, Collector<Row> out) throws Exception {
           Row newRow = useNewRule(value);
           System.out.println("测试两个out是否为一个对象1:"+out.toString());
           out.collect(newRow);
       }
   
       /**
        * 使用新的规则
        * @return
        */
       private Row useNewRule(Row value) {
           // 查询数据库
           // 加载新的规则,生产新的row
           return value;
       }
   
       @Override
       public void processElement2(ControlEvent value, Context ctx, Collector<Row> out) throws Exception {
           reloadRule();
           System.out.println("测试两个out是否为一个对象2:"+out.toString());
           // 控制流加载配置后往后面传递
           out.collect(value);
       }
   
       /**
        * 重新加载规则,并且预编译
        * @throws InterruptedException
        */
       private void reloadRule() throws InterruptedException {
   //        System.out.println("进行重新加载配置操作-开始");
   //        Thread.sleep(1000L);
   //        System.out.println("进行重新加载配置操作-结束");
       }
   }
   
   ```
   ![image](https://user-images.githubusercontent.com/20364465/79977937-afc25000-84d1-11ea-8f01-51971d8a4565.png)
   
   这里原本设计A B两条线是断开的,不能往下传递,因此需要广播,广播到每个节点,这次想修改就是想打通AB两条线,到下面的算子并且由`processElement2`方法去操作,但是实际测试
   ```
   
   processElement1(Row value, Context ctx, Collector<Row> out) 
   processElement2(ControlEvent value, Context ctx, Collector<Row> out) 
   ```
   这两个的out是一个对象,因此会发送到一个流中,而且CoProcessFunction定义就是两个输入,一个输出,输出的类型必须一致
   ```
   
   public abstract class CoProcessFunction<IN1, IN2, OUT> 
   ```
   
   所以上述如果直接输出是到了一个流中了如图CD。想有个算法是这样的
   ```
   
   public abstract class xxxProcessFunction<IN1, IN2, OUT1, OUT2> 
   ```
   但是发现没有


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org