You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jim Chen <ch...@gmail.com> on 2020/03/26 07:52:44 UTC

When i use the Tumbling Windows, find lost some record

Hi, All

  When i use the Tumbling Windows, find lost some record. My code as follow

*env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);*

*env.addSource(FlinkKafkaConsumer011......)*







*.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.minutes(3)) {
              @Override                    public long
extractTimestamp(JSONObject jsonObject) {                        long
logTime = jsonObject.getLongValue("logTime");                        return
logTime;                    }                })*


*.keyBy(jsonObject -> {                    return
jsonObject.getString("userId");                })*

*.timeWindow(Time.seconds(30))*

*.process(new ProcessWindowFunction<JSONObject, JSONObject, String,
TimeWindow>() {*
*                 public void process(String key, Context context,
Iterable<JSONObject> iterable, Collector<JSONObject> collector) throws
Exception {*




*                        SimpleDateFormat sdf = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss");                        String
start = sdf.format(new Date(context.window().getStart()));
      String end = sdf.format(new Date(context.window().getEnd()));
                System.out.println(start + "----" + end);*
*                        for (JSONObject jsonObject : iterable) {*
*                           collector.collect(jsonObject);*
*}}}*
*.print("====");*

From the print result, i found lost some record in the tumbling window. I
can't figure out, any one can help me ?

Re: When i use the Tumbling Windows, find lost some record

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,

Can you share more details what do you mean that you loose some records?
Can you share what data are you ingesting what are the expected results
and what are the actual results you are getting. Without that it's
impossible to help you. So far your code looks rather correct.

Best,

Dawid

On 26/03/2020 08:52, Jim Chen wrote:
> Hi, All
>
>   When i use the Tumbling Windows, find lost some record. My code as
> follow
>
> /env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);/
> /env.addSource(FlinkKafkaConsumer011......)
> /
> /.assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.minutes(3)) {
>                     @Override
>                     public long extractTimestamp(JSONObject jsonObject) {
>                         long logTime = jsonObject.getLongValue("logTime");
>                         return logTime;
>                     }
>                 })
> /
> /.keyBy(jsonObject -> {
>                     return jsonObject.getString("userId");
>                 })/
> /.timeWindow(Time.seconds(30))
> /
> /.process(new ProcessWindowFunction<JSONObject, JSONObject, String,
> TimeWindow>() {
> /
> /                 public void process(String key, Context context,
> Iterable<JSONObject> iterable, Collector<JSONObject> collector) throws
> Exception {/
> /                        SimpleDateFormat sdf = new
> SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
>                         String start = sdf.format(new
> Date(context.window().getStart()));
>                         String end = sdf.format(new
> Date(context.window().getEnd()));
>                         System.out.println(start + "----" + end);
> /
> /                        for (JSONObject jsonObject : iterable) {/
> /                           collector.collect(jsonObject);/
> /}}}/
> /.print("====");/
> /
> /
> From the print result, i found lost some record in the tumbling
> window. I can't figure out, any one can help me ?