You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Biplob Biswas <re...@gmail.com> on 2016/04/26 17:12:24 UTC

Return unique counter using groupReduceFunction

Hi,

I am using a groupreduce function to aggregate the content of the objects
but at the same time i need to return a unique counter from the function but
my attempts are failing and the identifiers are somehow very random and
getting duplicated.

Following is the part of my code which is supposed to generate a unique
counter and return it with out.collect.


        public static class sumReducer implements 
	GroupReduceFunction<Tuple2&lt;Integer, Point>, Tuple5<Integer,Point, Point,
Long, Long>> {

		double sum[] = null;
		double sumOfSquare[] = null;
		long timestamp = 0;
		@Override
		public void reduce(Iterable<Tuple2&lt;Integer, Point>> in,
Collector<Tuple5&lt;Integer,Point, Point, Long, Long>> out)
				throws Exception {
			
			int id = 0;
			long count = 0;
			boolean flag = true;
			for(Tuple2<Integer, Point> i:in)
			{
				if(flag)
				{
					timestamp++;
					System.out.println("uniqueid: " + i.f0 + ", t: " + timestamp );
					sum = new double[i.f1.pt.length];
					sumOfSquare = new double[sum.length];
					id = i.f0;
					for(int j=0;j<sum.length;j++)
					{
						sum[j] = i.f1.pt[j];
						sumOfSquare[j] = i.f1.pt[j] * i.f1.pt[j];
					}
					flag = false;
				}
				else
				{
					int len = i.f1.pt.length;
					for(int j=0;j&lt;len;j++)
					{
						sum[j] += i.f1.pt[j];
						sumOfSquare[j] += (i.f1.pt[j] * i.f1.pt[j]);
					}
				}
				count++;
			}
			out.collect(new Tuple5&lt;Integer,Point, Point, Long, Long>(id,new
Point(sum), new Point(sumOfSquare),count, timestamp));		
		}

I want the timestamp to be unique, but even though the code
"System.out.println("uniqueid: " + i.f0 + ", t: " + timestamp );" executes
once for each of the identifier (given by i.f0) by which it is grouped and
then the groupReducce function is called still I get the following output
for the above println statement.

uniqueid: 2, t: 1
uniqueid: 1, t: 1
uniqueid: 7, t: 2
uniqueid: 9, t: 3
uniqueid: 6, t: 2
uniqueid: 3, t: 1
uniqueid: 5, t: 2
uniqueid: 8, t: 3

I dont really get why I am getting this discrepancy, probably I am missing
some Flink concept, I am relatively very new to the flink platform and any
help is appreciated. Thanks a lot.

Thanks and Regards



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Return-unique-counter-using-groupReduceFunction-tp6452.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Return unique counter using groupReduceFunction

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Biplob,

Flink is a distributed, data parallel system which means that there are
several instances of you ReduceFunction running in parallel, each with its
own timestamp counter.
If you want to have a unique timestamp, you have to set the parallelism of
the reduce operator to 1, but then the program might become inefficient.

Maybe DataSetUtils.zipWithIndex() or DataSetUtils.zipWithUniqueId() are
helpful for you use case.

Best, Fabian


2016-04-26 17:12 GMT+02:00 Biplob Biswas <re...@gmail.com>:

> Hi,
>
> I am using a groupreduce function to aggregate the content of the objects
> but at the same time i need to return a unique counter from the function
> but
> my attempts are failing and the identifiers are somehow very random and
> getting duplicated.
>
> Following is the part of my code which is supposed to generate a unique
> counter and return it with out.collect.
>
>
>         public static class sumReducer implements
>         GroupReduceFunction<Tuple2&lt;Integer, Point>,
> Tuple5<Integer,Point, Point,
> Long, Long>> {
>
>                 double sum[] = null;
>                 double sumOfSquare[] = null;
>                 long timestamp = 0;
>                 @Override
>                 public void reduce(Iterable<Tuple2&lt;Integer, Point>> in,
> Collector<Tuple5&lt;Integer,Point, Point, Long, Long>> out)
>                                 throws Exception {
>
>                         int id = 0;
>                         long count = 0;
>                         boolean flag = true;
>                         for(Tuple2<Integer, Point> i:in)
>                         {
>                                 if(flag)
>                                 {
>                                         timestamp++;
>                                         System.out.println("uniqueid: " +
> i.f0 + ", t: " + timestamp );
>                                         sum = new double[i.f1.pt.length];
>                                         sumOfSquare = new
> double[sum.length];
>                                         id = i.f0;
>                                         for(int j=0;j<sum.length;j++)
>                                         {
>                                                 sum[j] = i.f1.pt[j];
>                                                 sumOfSquare[j] = i.f1.pt[j]
> * i.f1.pt[j];
>                                         }
>                                         flag = false;
>                                 }
>                                 else
>                                 {
>                                         int len = i.f1.pt.length;
>                                         for(int j=0;j&lt;len;j++)
>                                         {
>                                                 sum[j] += i.f1.pt[j];
>                                                 sumOfSquare[j] += (i.f1.pt[j]
> * i.f1.pt[j]);
>                                         }
>                                 }
>                                 count++;
>                         }
>                         out.collect(new Tuple5&lt;Integer,Point, Point,
> Long, Long>(id,new
> Point(sum), new Point(sumOfSquare),count, timestamp));
>                 }
>
> I want the timestamp to be unique, but even though the code
> "System.out.println("uniqueid: " + i.f0 + ", t: " + timestamp );" executes
> once for each of the identifier (given by i.f0) by which it is grouped and
> then the groupReducce function is called still I get the following output
> for the above println statement.
>
> uniqueid: 2, t: 1
> uniqueid: 1, t: 1
> uniqueid: 7, t: 2
> uniqueid: 9, t: 3
> uniqueid: 6, t: 2
> uniqueid: 3, t: 1
> uniqueid: 5, t: 2
> uniqueid: 8, t: 3
>
> I dont really get why I am getting this discrepancy, probably I am missing
> some Flink concept, I am relatively very new to the flink platform and any
> help is appreciated. Thanks a lot.
>
> Thanks and Regards
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Return-unique-counter-using-groupReduceFunction-tp6452.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>