You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "benwang li (JIRA)" <ji...@apache.org> on 2018/03/01 02:27:00 UTC
[jira] [Created] (FLINK-8815) EventTime won't work as reduce
benwang li created FLINK-8815:
---------------------------------
Summary: EventTime won't work as reduce
Key: FLINK-8815
URL: https://issues.apache.org/jira/browse/FLINK-8815
Project: Flink
Issue Type: Bug
Components: DataStream API
Affects Versions: 1.4.1
Environment:
Main Code
{code:java}
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ObjectMapper jsonMapper = new ObjectMapper();
Properties properties = new Properties();
// String brokers = "172.27.138.8:9092";
String brokers = "localhost:9092";
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("group.id", "test_fink");
String topic = "stream_test";
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<BitRate> myConsumer =
new FlinkKafkaConsumer010(topic, new BitRate.BitRateDeserializtionSchema(), properties);
DataStream<BitRate> stream = env.addSource(myConsumer).assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
DataStream<BitRate>
reduceItems =
stream
.keyBy(a -> a.gameId)
.timeWindow(Time.seconds(1))
.reduce((a, b) -> a.add(b));
reduceItems.print(); //never print
reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", (tuple) -> {
try {
tuple.end();
System.out.println(tuple.rate + "\t" + tuple.user); //never print
return jsonMapper.writeValueAsBytes(tuple);
} catch (JsonProcessingException e) {
e.printStackTrace();
return "".getBytes();
}
}));
env.execute("Flink Streaming Java API Skeleton");
}
{code}
The reduceItems will never print, But the bitrate add method print logs.
My log is simple like this, all the log are generated in time.
{code:java}
4281_783_1519827320460
7347_939_1519827320460
3281_984_1519827320460
8225_810_1519827320460
3956_920_1519827320460
6594_815_1519827320460
5962_925_1519827320460
4028_854_1519827320460
811_875_1519827320460
3837_974_1519827320460
{code}
My Event BitRate
{code:java}
public class BitRate {
public long eventTime;
public long gameId;
public long rate;
public long user;
public long startTs;
public long endTs;
public int type;
public BitRate() {
}
public BitRate(long eventTime, long gameId, long rate, long user) {
this.eventTime = eventTime;
this.gameId = gameId;
this.rate = rate;
this.user = user;
this.startTs = System.currentTimeMillis();
this.type = 0;
}
public void end() {
this.endTs = System.currentTimeMillis();
// System.out.println("end" + this.user);
}
public BitRate add(BitRate b) {
this.rate += b.rate;
this.user += b.user;
// System.out.println("add" + b.user);
return this;
}
{code}
My CustomWatermarkEmitter
{code:java}
public class CustomWatermarkEmitter implements AssignerWithPeriodicWatermarks<BitRate> {
@Nullable
@Override
public Watermark getCurrentWatermark() {
// System.out.println("get=>" + currentTs +maxTimeLag);
return new Watermark(System.currentTimeMillis());
}
@Override
public long extractTimestamp(BitRate bitRate, long l) {
// System.out.println("extract"+bitRate.startTs + ":" + l);
return bitRate.eventTime;
}
}
{code}
Reporter: benwang li
I Use the EventTime option to do the window reduce operation.But the reduce result stream got nothing.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)