You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by HG <ha...@gmail.com> on 2022/03/03 14:12:24 UTC

How to sort Iterable in ProcessWindowFunction?

Hi,
I have need to sort the input of the ProcesWindowFunction by one of the
fields of the Tuple4 that is in the Iterator.

 static class MyProcessWindowFunction extends
ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String,
TimeWindow> {
        @Override
        public void process(String key, Context context,
Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out)
        {
            Long elapsed       = 0L;
            Long pHandlingTime = 0L;
            Long totalElapsed  = 0L

            System.out.println(input.getClass());

            Iterator<Tuple4<Long, Long, String, String>> etter =
input.iterator();
            *for (Tuple4<Long, Long, String, String> in: input){*
                transactionId = in.getField(2).toString();
                elapsed       = Long.parseLong(in.getField(1).toString()) -
pHandlingTime;
                totalElapsed  = totalElapsed + elapsed;
                pHandlingTime = Long.parseLong(in.getField(1).toString())

                out.collect("Key : " + key + " Window : " +
context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
elapsed.toString() + "  max handling time : " + h.toString() + "
totalElapsed " + totalElapsed);
            }
        }
    }

Re: How to sort Iterable in ProcessWindowFunction?

Posted by HG <ha...@gmail.com>.
And the comparator function

The order of the return 1,0,-1 is relevant .
In this order -1,0,1 it will sort descending I discovered.

public static class SortEventsHandlingTime implements
Comparator<Tuple4<Long, Long, String, String>> {

    // Let's compare 2 Tuple4 objects
    public int compare(Tuple4<Long, Long, String, String> o1,
Tuple4<Long, Long, String, String> o2)
    {
        if (Long.parseLong(o1.getField(0).toString()) >
Long.parseLong(o2.getField(0).toString())) {
            return 1;
        }
        else if (Long.parseLong(o1.getField(0).toString()) ==
Long.parseLong(o2.getField(0).toString())){
            return 0;
        }
        else {
            return -1;
        }
    }
}


Op ma 7 mrt. 2022 om 03:05 schreef yidan zhao <hi...@gmail.com>:

> Collect the elements to a list, then sort, then collect out.
>
> HG <ha...@gmail.com> 于2022年3月3日周四 22:13写道:
>
>>   Hi,
>> I have need to sort the input of the ProcesWindowFunction by one of the
>> fields of the Tuple4 that is in the Iterator.
>>
>> Any advice as to what the best way is?
>>
>>  static class MyProcessWindowFunction extends
>> ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String,
>> TimeWindow> {
>>         @Override
>>         public void process(String key, Context context,
>> Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out)
>>         {
>>             Long elapsed       = 0L;
>>             Long pHandlingTime = 0L;
>>             Long totalElapsed  = 0L
>>
>>             System.out.println(input.getClass());
>>
>>             Iterator<Tuple4<Long, Long, String, String>> etter =
>> input.iterator();
>>             *for (Tuple4<Long, Long, String, String> in: input){*
>>                 transactionId = in.getField(2).toString();
>>                 elapsed       = Long.parseLong(in.getField(1).toString())
>> - pHandlingTime;
>>                 totalElapsed  = totalElapsed + elapsed;
>>                 pHandlingTime = Long.parseLong(in.getField(1).toString())
>>
>>                 out.collect("Key : " + key + " Window : " +
>> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
>> elapsed.toString() + "  max handling time : " + h.toString() + "
>> totalElapsed " + totalElapsed);
>>             }
>>         }
>>     }
>>
>>
>> Op do 3 mrt. 2022 om 15:12 schreef HG <ha...@gmail.com>:
>>
>>> Hi,
>>> I have need to sort the input of the ProcesWindowFunction by one of the
>>> fields of the Tuple4 that is in the Iterator.
>>>
>>>  static class MyProcessWindowFunction extends
>>> ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String,
>>> TimeWindow> {
>>>         @Override
>>>         public void process(String key, Context context,
>>> Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out)
>>>         {
>>>             Long elapsed       = 0L;
>>>             Long pHandlingTime = 0L;
>>>             Long totalElapsed  = 0L
>>>
>>>             System.out.println(input.getClass());
>>>
>>>             Iterator<Tuple4<Long, Long, String, String>> etter =
>>> input.iterator();
>>>             *for (Tuple4<Long, Long, String, String> in: input){*
>>>                 transactionId = in.getField(2).toString();
>>>                 elapsed       =
>>> Long.parseLong(in.getField(1).toString()) - pHandlingTime;
>>>                 totalElapsed  = totalElapsed + elapsed;
>>>                 pHandlingTime = Long.parseLong(in.getField(1).toString())
>>>
>>>                 out.collect("Key : " + key + " Window : " +
>>> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
>>> elapsed.toString() + "  max handling time : " + h.toString() + "
>>> totalElapsed " + totalElapsed);
>>>             }
>>>         }
>>>     }
>>>
>>

Re: How to sort Iterable in ProcessWindowFunction?

Posted by HG <ha...@gmail.com>.
For the record. So that other unexperienced people my benefit too
 😬

List<Tuple4<Long, Long, String, String>> inputList = new ArrayList<>();

input.forEach(inputList::add);
inputList.sort(new SortEventsHandlingTime());

for (Tuple4<Long, Long, String, String> in: inputList){


Op ma 7 mrt. 2022 om 03:05 schreef yidan zhao <hi...@gmail.com>:

> Collect the elements to a list, then sort, then collect out.
>
> HG <ha...@gmail.com> 于2022年3月3日周四 22:13写道:
>
>>   Hi,
>> I have need to sort the input of the ProcesWindowFunction by one of the
>> fields of the Tuple4 that is in the Iterator.
>>
>> Any advice as to what the best way is?
>>
>>  static class MyProcessWindowFunction extends
>> ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String,
>> TimeWindow> {
>>         @Override
>>         public void process(String key, Context context,
>> Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out)
>>         {
>>             Long elapsed       = 0L;
>>             Long pHandlingTime = 0L;
>>             Long totalElapsed  = 0L
>>
>>             System.out.println(input.getClass());
>>
>>             Iterator<Tuple4<Long, Long, String, String>> etter =
>> input.iterator();
>>             *for (Tuple4<Long, Long, String, String> in: input){*
>>                 transactionId = in.getField(2).toString();
>>                 elapsed       = Long.parseLong(in.getField(1).toString())
>> - pHandlingTime;
>>                 totalElapsed  = totalElapsed + elapsed;
>>                 pHandlingTime = Long.parseLong(in.getField(1).toString())
>>
>>                 out.collect("Key : " + key + " Window : " +
>> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
>> elapsed.toString() + "  max handling time : " + h.toString() + "
>> totalElapsed " + totalElapsed);
>>             }
>>         }
>>     }
>>
>>
>> Op do 3 mrt. 2022 om 15:12 schreef HG <ha...@gmail.com>:
>>
>>> Hi,
>>> I have need to sort the input of the ProcesWindowFunction by one of the
>>> fields of the Tuple4 that is in the Iterator.
>>>
>>>  static class MyProcessWindowFunction extends
>>> ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String,
>>> TimeWindow> {
>>>         @Override
>>>         public void process(String key, Context context,
>>> Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out)
>>>         {
>>>             Long elapsed       = 0L;
>>>             Long pHandlingTime = 0L;
>>>             Long totalElapsed  = 0L
>>>
>>>             System.out.println(input.getClass());
>>>
>>>             Iterator<Tuple4<Long, Long, String, String>> etter =
>>> input.iterator();
>>>             *for (Tuple4<Long, Long, String, String> in: input){*
>>>                 transactionId = in.getField(2).toString();
>>>                 elapsed       =
>>> Long.parseLong(in.getField(1).toString()) - pHandlingTime;
>>>                 totalElapsed  = totalElapsed + elapsed;
>>>                 pHandlingTime = Long.parseLong(in.getField(1).toString())
>>>
>>>                 out.collect("Key : " + key + " Window : " +
>>> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
>>> elapsed.toString() + "  max handling time : " + h.toString() + "
>>> totalElapsed " + totalElapsed);
>>>             }
>>>         }
>>>     }
>>>
>>

Re: How to sort Iterable in ProcessWindowFunction?

Posted by yidan zhao <hi...@gmail.com>.
Collect the elements to a list, then sort, then collect out.

HG <ha...@gmail.com> 于2022年3月3日周四 22:13写道:

>   Hi,
> I have need to sort the input of the ProcesWindowFunction by one of the
> fields of the Tuple4 that is in the Iterator.
>
> Any advice as to what the best way is?
>
>  static class MyProcessWindowFunction extends
> ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String,
> TimeWindow> {
>         @Override
>         public void process(String key, Context context,
> Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out)
>         {
>             Long elapsed       = 0L;
>             Long pHandlingTime = 0L;
>             Long totalElapsed  = 0L
>
>             System.out.println(input.getClass());
>
>             Iterator<Tuple4<Long, Long, String, String>> etter =
> input.iterator();
>             *for (Tuple4<Long, Long, String, String> in: input){*
>                 transactionId = in.getField(2).toString();
>                 elapsed       = Long.parseLong(in.getField(1).toString())
> - pHandlingTime;
>                 totalElapsed  = totalElapsed + elapsed;
>                 pHandlingTime = Long.parseLong(in.getField(1).toString())
>
>                 out.collect("Key : " + key + " Window : " +
> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
> elapsed.toString() + "  max handling time : " + h.toString() + "
> totalElapsed " + totalElapsed);
>             }
>         }
>     }
>
>
> Op do 3 mrt. 2022 om 15:12 schreef HG <ha...@gmail.com>:
>
>> Hi,
>> I have need to sort the input of the ProcesWindowFunction by one of the
>> fields of the Tuple4 that is in the Iterator.
>>
>>  static class MyProcessWindowFunction extends
>> ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String,
>> TimeWindow> {
>>         @Override
>>         public void process(String key, Context context,
>> Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out)
>>         {
>>             Long elapsed       = 0L;
>>             Long pHandlingTime = 0L;
>>             Long totalElapsed  = 0L
>>
>>             System.out.println(input.getClass());
>>
>>             Iterator<Tuple4<Long, Long, String, String>> etter =
>> input.iterator();
>>             *for (Tuple4<Long, Long, String, String> in: input){*
>>                 transactionId = in.getField(2).toString();
>>                 elapsed       = Long.parseLong(in.getField(1).toString())
>> - pHandlingTime;
>>                 totalElapsed  = totalElapsed + elapsed;
>>                 pHandlingTime = Long.parseLong(in.getField(1).toString())
>>
>>                 out.collect("Key : " + key + " Window : " +
>> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
>> elapsed.toString() + "  max handling time : " + h.toString() + "
>> totalElapsed " + totalElapsed);
>>             }
>>         }
>>     }
>>
>

Re: How to sort Iterable in ProcessWindowFunction?

Posted by HG <ha...@gmail.com>.
  Hi,
I have need to sort the input of the ProcesWindowFunction by one of the
fields of the Tuple4 that is in the Iterator.

Any advice as to what the best way is?

 static class MyProcessWindowFunction extends
ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String,
TimeWindow> {
        @Override
        public void process(String key, Context context,
Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out)
        {
            Long elapsed       = 0L;
            Long pHandlingTime = 0L;
            Long totalElapsed  = 0L

            System.out.println(input.getClass());

            Iterator<Tuple4<Long, Long, String, String>> etter =
input.iterator();
            *for (Tuple4<Long, Long, String, String> in: input){*
                transactionId = in.getField(2).toString();
                elapsed       = Long.parseLong(in.getField(1).toString()) -
pHandlingTime;
                totalElapsed  = totalElapsed + elapsed;
                pHandlingTime = Long.parseLong(in.getField(1).toString())

                out.collect("Key : " + key + " Window : " +
context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
elapsed.toString() + "  max handling time : " + h.toString() + "
totalElapsed " + totalElapsed);
            }
        }
    }


Op do 3 mrt. 2022 om 15:12 schreef HG <ha...@gmail.com>:

> Hi,
> I have need to sort the input of the ProcesWindowFunction by one of the
> fields of the Tuple4 that is in the Iterator.
>
>  static class MyProcessWindowFunction extends
> ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String,
> TimeWindow> {
>         @Override
>         public void process(String key, Context context,
> Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out)
>         {
>             Long elapsed       = 0L;
>             Long pHandlingTime = 0L;
>             Long totalElapsed  = 0L
>
>             System.out.println(input.getClass());
>
>             Iterator<Tuple4<Long, Long, String, String>> etter =
> input.iterator();
>             *for (Tuple4<Long, Long, String, String> in: input){*
>                 transactionId = in.getField(2).toString();
>                 elapsed       = Long.parseLong(in.getField(1).toString())
> - pHandlingTime;
>                 totalElapsed  = totalElapsed + elapsed;
>                 pHandlingTime = Long.parseLong(in.getField(1).toString())
>
>                 out.collect("Key : " + key + " Window : " +
> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
> elapsed.toString() + "  max handling time : " + h.toString() + "
> totalElapsed " + totalElapsed);
>             }
>         }
>     }
>