You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "benwang li (JIRA)" <ji...@apache.org> on 2018/03/01 02:27:00 UTC

[jira] [Created] (FLINK-8815) EventTime won't work as reduce

benwang li created FLINK-8815:
---------------------------------

             Summary: EventTime won't work as reduce
                 Key: FLINK-8815
                 URL: https://issues.apache.org/jira/browse/FLINK-8815
             Project: Flink
          Issue Type: Bug
          Components: DataStream API
    Affects Versions: 1.4.1
         Environment:  

Main Code 

 
{code:java}
public class StreamingJob {
  public static void main(String[] args) throws Exception {
    // set up the streaming execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    ObjectMapper jsonMapper = new ObjectMapper();

    Properties properties = new Properties();
//    String brokers = "172.27.138.8:9092";
    String brokers = "localhost:9092";
    properties.setProperty("bootstrap.servers", brokers);
    properties.setProperty("group.id", "test_fink");
    String topic = "stream_test";

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    FlinkKafkaConsumer010<BitRate> myConsumer =
        new FlinkKafkaConsumer010(topic, new BitRate.BitRateDeserializtionSchema(), properties);

    DataStream<BitRate> stream = env.addSource(myConsumer).assignTimestampsAndWatermarks(new CustomWatermarkEmitter());

    DataStream<BitRate>
        reduceItems =
        stream
            .keyBy(a -> a.gameId)
            .timeWindow(Time.seconds(1))
            .reduce((a, b) -> a.add(b));

    reduceItems.print(); //never print
    reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", (tuple) -> {
      try {
        tuple.end();
        System.out.println(tuple.rate + "\t" + tuple.user); //never print
        return jsonMapper.writeValueAsBytes(tuple);
      } catch (JsonProcessingException e) {
        e.printStackTrace();
        return "".getBytes();
      }
    }));

    env.execute("Flink Streaming Java API Skeleton");
  }
{code}
 

The reduceItems will never print, But  the bitrate add method print logs.

My log is simple like this, all the log are generated in time.
{code:java}
4281_783_1519827320460
7347_939_1519827320460
3281_984_1519827320460
8225_810_1519827320460
3956_920_1519827320460
6594_815_1519827320460
5962_925_1519827320460
4028_854_1519827320460
811_875_1519827320460
3837_974_1519827320460
{code}
 

 

 

My Event BitRate

 
{code:java}
public class BitRate {
  public long eventTime;

  public long gameId;
  public long rate;
  public long user;
  public long startTs;
  public long endTs;
  public int type;

  public BitRate() {
  }

  public BitRate(long eventTime, long gameId, long rate, long user) {
    this.eventTime = eventTime;

    this.gameId = gameId;
    this.rate = rate;
    this.user = user;
    this.startTs = System.currentTimeMillis();
    this.type = 0;
  }

  public void end() {
    this.endTs = System.currentTimeMillis();
//    System.out.println("end" + this.user);
  }

  public BitRate add(BitRate b) {
    this.rate += b.rate;
    this.user += b.user;
//    System.out.println("add" + b.user);
    return this;
  }
{code}
 

 

My CustomWatermarkEmitter
{code:java}
public class CustomWatermarkEmitter implements AssignerWithPeriodicWatermarks<BitRate> {
@Nullable
@Override
public Watermark getCurrentWatermark() {
// System.out.println("get=>" + currentTs +maxTimeLag);
return new Watermark(System.currentTimeMillis());
}

@Override
public long extractTimestamp(BitRate bitRate, long l) {
// System.out.println("extract"+bitRate.startTs + ":" + l);
return bitRate.eventTime;
}
}
{code}
 

 
            Reporter: benwang li


I Use the EventTime option to do the window reduce operation.But the reduce result stream got nothing.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)