You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Biplob Biswas <re...@gmail.com> on 2016/04/26 17:34:41 UTC

Regarding Broadcast of datasets in streaming context

Hi, I have yet another question, this time maintaining a global list of
centroids. 

I am trying to implement the clustream algorithm and for that purpose I have
the initial set of centres in a flink dataset. Now I need to update the set
of centres for every data tuple that comes from the stream. From what I have
read so far on 2 different posts having similar questions, is that, in case
of streaming datasets the co-map operator was asked to use and retrieve them
in 2 separate map functions.

My idea is to broadcast the dataset in each flink partition and whenever a
data tuple is mapped to a partition using a map function, update the
broadcasted dataset.
But as this is currently not possible, thus I was thinking to broadcast the
datastream using 

"ds.broadcast()"

so that every partition receives the streamed tuple. Then, use a normal
flatmap function for the centres and use the broadcasted tuple to update the
centres and return the updated set of centres.

My question is, would this work? If yes, may someone give an example of the
datastream broadcast function and how to retrieve the broadcasted stream in
a map function?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Regarding Broadcast of datasets in streaming context

Posted by Biplob Biswas <re...@gmail.com>.
Hi,

i read that article already but it is very simplistic and thus based on that
article and other examples, i was trying to understand how my centroids can
be sent to all the partitions and update accordingly.

I also understood that the order of the input and the feedback stream cant
be determined but i was expecting it to be broadcasted after every collect
call so that all the partitions receive updated values.

But now i am confused how this entire iteration and broadcast thing can even
help me maintaining a central state of centroids.

I have even tried something similar to this: 

 DataStream<Long> mainInput = env.generateSequence(2, 30);
		 DataStream<Long> initialIterateInput = env.fromElements(i);
	
	
		IterativeStream.ConnectedIterativeStreams<Long, Long[]> iteration =
		        mainInput.iterate().withFeedbackType(Long[].class);
		 
	
		DataStream<Long[]> iterateHead = iteration
		        .flatMap(new CoFlatMapFunction<Long, Long[], Long[]>() {
		            long globalVal = 1;
		            Long[] arr;
		            boolean flag = true;
		            int i = 0;
		        	@Override
		            public void flatMap1(Long value, Collector<Long[]> out) throws
Exception {
		        		
		        		if(flag)
		        		{
		        			arr = new Long[10];
		        		}
		                Thread.sleep(1000);
		                arr[i] = value;
		                i++;
		                System.out.println("SEEING FROM INPUT 1: " +
Arrays.toString(arr) +", "+globalVal);		                
		                out.collect(arr);
		            }
	
		            @Override
		            public void flatMap2(Long[] value, Collector<Long[]> out)
throws Exception {
		                Thread.sleep(1000);
		                for(int i=0 ;i<value.length;i++)
		                {
		                	arr[i] = value[i];
		                }
		                System.out.println("SEEING FROM INPUT 2: " +
Arrays.toString(arr) +", "+globalVal);
	
		                //out.collect(value);
	
		            }
		        });
	
		iteration.closeWith(iterateHead.broadcast());

where the arr is the array of my centroids and the value in the first map
would be the points coming from input stream. 
So,i made this example to be run for a small streaming scenario and the
results which are being printed. 

As I started working on this based on the idea that collection is done and
then on each iteration for each point the broadcast supplies the latest
centroid.

That's why i am constantly asking you and providing you updates of what I
did and what I am doing, but unless I understand how this central state of
centroid is  emulated I cant proceed forward.

Thus I request you if you can provide me with a small example or snippet or
anything to make me understand how are you proposing to keep a central state
and when to update. As without this basic understanding I am not being able
to do anything.

Thanks a lot.

Regards
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6932.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Regarding Broadcast of datasets in streaming context

Posted by Gyula Fóra <gy...@gmail.com>.
Hi,

If you haven't done so far please read the respective part of the the
streaming docs:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#iterations

Iterations just allow you to define cyclic flows, there is nothing magic
about it. If your original input stream is finite, there is no guarantee on
the order of your input and feedback stream so it can easily happen that
the original input is consumed before receiving the feedback.

Also the broadcast has again nothing to do with iterations itself. It is a
partitioning patter which just means that the tuple sent will be received
by all downstream instances. You have to work around these abstractions.

Cheers,
Gyula

Biplob Biswas <re...@gmail.com> ezt írta (időpont: 2016. máj. 15.,
V, 17:01):

> Hi Gyula,
>
> even after trying different things, I can't seem to get the hold of things.
> Also, i asked another question on the working of iteration and streaming
> here
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Unexpected-behaviour-in-datastream-broadcast-td6848.html
> >
>
> Because its not working the way i am expecting it to be and the inputstream
> is completely consumed before anything is sent back and iterated.
>
> Could you please send me to a proper direction and help me in understanding
> the things properly?
>
> Thanks and Regards
> Biplob Biswas
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6928.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Regarding Broadcast of datasets in streaming context

Posted by Biplob Biswas <re...@gmail.com>.
Hi Gyula,

even after trying different things, I can't seem to get the hold of things.
Also, i asked another question on the working of iteration and streaming 
here
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Unexpected-behaviour-in-datastream-broadcast-td6848.html>  

Because its not working the way i am expecting it to be and the inputstream
is completely consumed before anything is sent back and iterated.

Could you please send me to a proper direction and help me in understanding
the things properly?

Thanks and Regards
Biplob Biswas



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6928.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Regarding Broadcast of datasets in streaming context

Posted by Biplob Biswas <re...@gmail.com>.
Hi Gyula,

I tried doing something like the following in the 2 flatmaps, but i am not
getting desired results and still confused how the concept you put forward
would work:

public static final class MyCoFlatmap implements CoFlatMapFunction<Point,
Centroid, Centroid>{
		
		Centroid[] centroids;

		
		@Override
		public void flatMap1(Point in, Collector<Centroid> out) throws Exception {

			if(flag)
			{
				Centroids = new Centroid[numofMC];
				flag = false;
			}
			if(id < numofMC)
			{   
				System.out.println(id);
				Centroid generatedMC =
CentroidCreator.generateCentroid(id,timestamp,in);
				Centroids[id] = generatedMC;
				out.collect(generatedMC);
				id++;
			}
			else
			{
				Centroid closestMC = null;
				double minDistance = Double.MAX_VALUE;
				for(Centroid mc : Centroids) 
				{
			      double distance = distance(in.pt, mc.getCenter());
			      if (distance < minDistance) {
			        closestMC = mc;
			        minDistance = distance;
			      }
			    }
				double radius = getRadius(closestMC, Centroids);
				if (minDistance < radius) 
				{
					closestMC.insert(in.pt, timestamp);
				}
				out.collect(closestMC);
			}
		}

		@Override
		public void flatMap2(Centroid in, Collector<Centroid> out) throws
Exception {
			Centroids[in.id] = in;
			System.out.println("MC: "+in.toString());
		}
		
	}

as mentioned in my previous reply,  i understand that each of the map
function in the co-flat map would receive one tuple each at a time .. so
that would mean if i have a datastream of centroids, it would arrive one at
a time on the partitions and that would defeat the purpose because i need
all of the centroid to compare the distance to. 

I tried storing the centroids in an array of centroid but  i again dont
understand how i can push all of the changes back.

a small example or code snippet would really be helpful.

Thanks a lot

Regards
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6816.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Regarding Broadcast of datasets in streaming context

Posted by Biplob Biswas <re...@gmail.com>.
This is exactly what I am confused about, if i understand it correctly each
of the map function in the co-flat map would receive one tuple each at a
time .. so that would mean if i have a datastream of centroids, it would
arrive one at a time on the partitions and that would defeat the purpose.

Are you proposing that i put the entire list of centroid as a single
datastream object so that the map functions get the entire list whenever it
is called? 

Would it be possible for you to just give an example or a code snippet or a
link to some use case of the co-flat map function?

Thanks a lot for your help throughout.

Regards
Biplob Biswas 


Gyula Fóra wrote
> Hi,
> 
> Iterating after every incoming point/centroid update means that you
> basically defeat the purpose of having parallelism in your Flink job.
> 
> If you only "sync" the centroids periodically by the broadcast you can
> make
> your program run efficiently in parallel. This should be fine for machine
> learning use-cases where the results should converge anyways.
> 
> Gyula
> 
> Biplob Biswas &lt;

> revolutionisme@

> &gt; ezt írta (időpont: 2016. máj. 2.,
> H, 17:02):
> 
>> Hi Gyula,
>>
>> Could you explain a bit why i wouldn't want the centroids to be collected
>> after every point?
>>
>> I mean, once I get a streamed point via map1 function .. i would want to
>> compare the distance of the point with a centroid which arrives via map2
>> function and i keep on comparing for every centroid which comes in
>> subsequently, once the update of the centroid happens shouldn't i collect
>> the entire set? Thus, updating a centroid and collecting it back for the
>> next point in the iteration.
>>
>> I may not be getting the concept properly here, so an example snippet
>> would
>> help in a long run.
>>
>> Thanks & Regards
>> Biplob
>> Gyula Fóra wrote
>> > Hey,
>> >
>> > I think you got the good idea :)
>> >
>> > So your coflatmap will get all the centroids that you have sent to the
>> > stream in the closeWith call. This means that whenever you collect a
>> new
>> > set of centroids they will be iterated back. This means you don't
>> always
>> > want to send the centroids out on the collector, only periodically.
>> >
>> > The order in which these come is pretty much arbitrary so you need to
>> make
>> > sure to add some logic by which you can order it if this is important.
>> >
>> > Im not sure if this helped or not :D
>> >
>> > Gyula
>> >
>> > Biplob Biswas &lt;
>>
>> > revolutionisme@
>>
>> > &gt; ezt írta (időpont: 2016. máj. 2.,
>> > H, 13:13):
>> >
>> >> Hi Gyula,
>> >>
>> >> I understand more now how this thing might work and its fascinating.
>> >> Although I still have one question with the coflatmap function.
>> >>
>> >> First, let me explain what I understand and whether its correct or
>> not:
>> >> 1. The connected iterative stream ensures that the coflatmap function
>> >> receive the points and the centroids which are broadcasted on each
>> >> iteration
>> >> defined by closewith.
>> >>
>> >> 2. So in the coflatmap function, on one map I get the points and on
>> the
>> >> other map function i get the centroids which are broadcasted.
>> >>
>> >> Now comes the part I am assuming a bit because I dont understand from
>> the
>> >> theory.
>> >> 3. Assuming I can use the broadcasted centroids, I calculate the
>> nearest
>> >> centroid from the streamed point and I update the centroid and only
>> use
>> >> one
>> >> of the collectors to return the updated centroids list back.
>> >>
>> >>
>> >> The question here is, I am assuming that this operation is not done in
>> >> parallel as if streams are sent in parallel how would I ensure correct
>> >> update of the centroids as multiple points can try to update the same
>> >> centroid in parallel .
>> >>
>> >> I hope I made myself clear with this.
>> >>
>> >> Thanks and Regards
>> >> Biplob
>> >> Biplob Biswas wrote
>> >> > Hi Gyula,
>> >> >
>> >> > I read your workaround and started reading about flink iterations,
>> >> > coflatmap operators and other things. Now, I do understand a few
>> things
>> >> > but the solution you provided is not completely clear to me.
>> >> >
>> >> > I understand the following things from your post.
>> >> > 1. You initially have a datastream of points, on which you iterate
>> and
>> >> the
>> >> > 'withFeedbackType' defines the type of the connected stream so
>> rather
>> >> than
>> >> > "Points" the type is  "Centroids" now.
>> >> >
>> >> > 2.On this connected stream (which I understand, only have the
>> streamed
>> >> > points right now), you run a flat map operator. And you mention
>> >> /
>> >> > "MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive
>> >> events
>> >> > and update its local centroids (and periodically output the
>> centroids)
>> >> and
>> >> > on the other input would send centroids of other flatmaps and would
>> >> merge
>> >> > them to the local."
>> >> /
>> >> > I dont understand this part completely, if i am not wrong, you are
>> >> saying
>> >> > that the co flatmap function would have 2 map functions. Now i dont
>> >> > understand this part .. as to what specifically am i doing in each
>> map
>> >> > function?
>> >> >
>> >> > 3. lastly, the updated centroids which came back from the coflatmap
>> >> > function is fed back to the stream again and this is the part i get
>> >> lost
>> >> > again ... how is this centroid fed back and if this is fed back what
>> >> > happens to the point stream? and if it does somehow is fed back, how
>> do
>> >> i
>> >> > catch it in the coflatmap function?
>> >> >
>> >> >
>> >> > If I understand this a bit, then in your code the first set of
>> >> centroids
>> >> > are created in the coflatmap function and you dont already have a
>> list
>> >> of
>> >> > centroids to start with? Am i assuming it correct?
>> >> >
>> >> > I underwent the process of iteration in the Kmeans example from this
>> >> > following link:
>> >> >
>> >>
>> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java
>> >> >
>> >> > and I understand how this is working .. but i am stil not clear how
>> ur
>> >> > example is working.
>> >> >
>> >> > Could you please explain it a bit more? with some examples maybe?
>> >> >
>> >> > Thanks a lot.
>> >> > Gyula Fóra-2 wrote
>> >> >> Hi Biplob,
>> >> >>
>> >> >> I have implemented a similar algorithm as Aljoscha mentioned.
>> >> >>
>> >> >> First things to clarify are the following:
>> >> >> There is currently no abstraction for keeping objects (in you case
>> >> >> centroids) in a centralized way that can be updated/read by all
>> >> >> operators.
>> >> >> This would probably be very costly and is actually not necessary in
>> >> your
>> >> >> case.
>> >> >>
>> >> >> Broadcast a stream in contrast with other partitioning methods mean
>> >> that
>> >> >> the events will be replicated to all downstream operators. This not
>> a
>> >> >> magical operator that will make state available among parallel
>> >> instances.
>> >> >>
>> >> >> Now let me explain what I think you want from Flink and how to do
>> it
>> >> :)
>> >> >>
>> >> >> You have input data stream and a set of centroids to be updated
>> based
>> >> on
>> >> >> the incoming records. As you want to do this in parallel you have
>> an
>> >> >> operator (let's say a flatmap) that keeps the centroids locally and
>> >> >> updates
>> >> >> it on it's inputs. Now you have a set of independently updated
>> >> centroids,
>> >> >> so you want to merge them and update the centroids in each flatmap.
>> >> >>
>> >> >> Let's see how to do this. Given that you have your centroids
>> locally,
>> >> >> updating them is super easy, so I will not talk about that. The
>> >> >> problematic
>> >> >> part is periodically merging end "broadcasting" the centroids so
>> all
>> >> the
>> >> >> flatmaps eventually see the same (they don't have to always be the
>> >> same
>> >> >> for
>> >> >> clustering probably). There is no operator for sending state
>> >> (centroids)
>> >> >> between subtasks so you have to be clever here. We can actually use
>> >> >> cyclic
>> >> >> streams to solve this problem by sending the centroids as simple
>> >> events
>> >> >> to
>> >> >> a CoFlatMap:
>> >> >>
>> >> >> DataStream
>> >> >>
>> > 
> <Point>
>> >> >>  input = ...
>> >> >> ConnectedIterativeStreams&lt;Point, Centroids&gt;
>> inputsAndCentroids
>> =
>> >> >> input.iterate().withFeedbackType(Centroids.class)
>> >> >> DataStream
>> >> >>
>> > 
> <Centroids>
>> >> >>  updatedCentroids =
>> >> >> inputsAndCentroids.flatMap(MyCoFlatmap)
>> >> >> inputsAndCentroids.closeWith(updatedCentroids.broadcast())
>> >> >>
>> >> >> MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive
>> >> events
>> >> >> and update its local centroids (and periodically output the
>> centroids)
>> >> >> and
>> >> >> on the other input would send centroids of other flatmaps and would
>> >> merge
>> >> >> them to the local.
>> >> >>
>> >> >> This might be a lot to take in at first, so you might want to read
>> up
>> >> on
>> >> >> streaming iterations and connected streams before you start.
>> >> >>
>> >> >> Let me know if this makes sense.
>> >> >>
>> >> >> Cheers,
>> >> >> Gyula
>> >> >>
>> >> >>
>> >> >> Biplob Biswas &lt;
>> >>
>> >> >> revolutionisme@
>> >>
>> >> >> &gt; ezt írta (időpont: 2016. ápr. 28.,
>> >> >> Cs, 14:41):
>> >> >>
>> >> >>> That would really be great, any example would help me proceed with
>> my
>> >> >>> work.
>> >> >>> Thanks a lot.
>> >> >>>
>> >> >>>
>> >> >>> Aljoscha Krettek wrote
>> >> >>> > Hi Biplob,
>> >> >>> > one of our developers had a stream clustering example a while
>> back.
>> >> It
>> >> >>> was
>> >> >>> > using a broadcast feedback edge with a co-operator to update the
>> >> >>> > centroids.
>> >> >>> > I'll directly include him in the email so that he will notice
>> and
>> >> can
>> >> >>> send
>> >> >>> > you the example.
>> >> >>> >
>> >> >>> > Cheers,
>> >> >>> > Aljoscha
>> >> >>> >
>> >> >>> > On Thu, 28 Apr 2016 at 13:57 Biplob Biswas &lt;
>> >> >>>
>> >> >>> > revolutionisme@
>> >> >>>
>> >> >>> > &gt; wrote:
>> >> >>> >
>> >> >>> >> I am pretty new to flink systems, thus can anyone atleast give
>> me
>> >> an
>> >> >>> >> example
>> >> >>> >> of how datastream.broadcast() method works? From the
>> documentation
>> >> i
>> >> >>> get
>> >> >>> >> the
>> >> >>> >> following:
>> >> >>> >>
>> >> >>> >> broadcast()
>> >> >>> >> Sets the partitioning of the DataStream so that the output
>> >> elements
>> >> >>> are
>> >> >>> >> broadcasted to every parallel instance of the next operation.
>> >> >>> >>
>> >> >>> >> If the output elements are broadcasted, then how are they
>> >> retrieved?
>> >> >>> Or
>> >> >>> >> maybe I am looking at this method in a completely wrong way?
>> >> >>> >>
>> >> >>> >> Thanks
>> >> >>> >> Biplob Biswas
>> >> >>> >>
>> >> >>> >>
>> >> >>> >>
>> >> >>> >> --
>> >> >>> >> View this message in context:
>> >> >>> >>
>> >> >>>
>> >>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
>> >> >>> >> Sent from the Apache Flink User Mailing List archive. mailing
>> list
>> >> >>> >> archive
>> >> >>> >> at Nabble.com.
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> --
>> >> >>> View this message in context:
>> >> >>>
>> >>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html
>> >> >>> Sent from the Apache Flink User Mailing List archive. mailing list
>> >> >>> archive
>> >> >>> at Nabble.com.
>> >> >>>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> View this message in context:
>> >>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6612.html
>> >> Sent from the Apache Flink User Mailing List archive. mailing list
>> >> archive
>> >> at Nabble.com.
>> >>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6619.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> at Nabble.com.
>>





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6707.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Regarding Broadcast of datasets in streaming context

Posted by Gyula Fóra <gy...@gmail.com>.
Hi,

Iterating after every incoming point/centroid update means that you
basically defeat the purpose of having parallelism in your Flink job.

If you only "sync" the centroids periodically by the broadcast you can make
your program run efficiently in parallel. This should be fine for machine
learning use-cases where the results should converge anyways.

Gyula

Biplob Biswas <re...@gmail.com> ezt írta (időpont: 2016. máj. 2.,
H, 17:02):

> Hi Gyula,
>
> Could you explain a bit why i wouldn't want the centroids to be collected
> after every point?
>
> I mean, once I get a streamed point via map1 function .. i would want to
> compare the distance of the point with a centroid which arrives via map2
> function and i keep on comparing for every centroid which comes in
> subsequently, once the update of the centroid happens shouldn't i collect
> the entire set? Thus, updating a centroid and collecting it back for the
> next point in the iteration.
>
> I may not be getting the concept properly here, so an example snippet would
> help in a long run.
>
> Thanks & Regards
> Biplob
> Gyula Fóra wrote
> > Hey,
> >
> > I think you got the good idea :)
> >
> > So your coflatmap will get all the centroids that you have sent to the
> > stream in the closeWith call. This means that whenever you collect a new
> > set of centroids they will be iterated back. This means you don't always
> > want to send the centroids out on the collector, only periodically.
> >
> > The order in which these come is pretty much arbitrary so you need to
> make
> > sure to add some logic by which you can order it if this is important.
> >
> > Im not sure if this helped or not :D
> >
> > Gyula
> >
> > Biplob Biswas &lt;
>
> > revolutionisme@
>
> > &gt; ezt írta (időpont: 2016. máj. 2.,
> > H, 13:13):
> >
> >> Hi Gyula,
> >>
> >> I understand more now how this thing might work and its fascinating.
> >> Although I still have one question with the coflatmap function.
> >>
> >> First, let me explain what I understand and whether its correct or not:
> >> 1. The connected iterative stream ensures that the coflatmap function
> >> receive the points and the centroids which are broadcasted on each
> >> iteration
> >> defined by closewith.
> >>
> >> 2. So in the coflatmap function, on one map I get the points and on the
> >> other map function i get the centroids which are broadcasted.
> >>
> >> Now comes the part I am assuming a bit because I dont understand from
> the
> >> theory.
> >> 3. Assuming I can use the broadcasted centroids, I calculate the nearest
> >> centroid from the streamed point and I update the centroid and only use
> >> one
> >> of the collectors to return the updated centroids list back.
> >>
> >>
> >> The question here is, I am assuming that this operation is not done in
> >> parallel as if streams are sent in parallel how would I ensure correct
> >> update of the centroids as multiple points can try to update the same
> >> centroid in parallel .
> >>
> >> I hope I made myself clear with this.
> >>
> >> Thanks and Regards
> >> Biplob
> >> Biplob Biswas wrote
> >> > Hi Gyula,
> >> >
> >> > I read your workaround and started reading about flink iterations,
> >> > coflatmap operators and other things. Now, I do understand a few
> things
> >> > but the solution you provided is not completely clear to me.
> >> >
> >> > I understand the following things from your post.
> >> > 1. You initially have a datastream of points, on which you iterate and
> >> the
> >> > 'withFeedbackType' defines the type of the connected stream so rather
> >> than
> >> > "Points" the type is  "Centroids" now.
> >> >
> >> > 2.On this connected stream (which I understand, only have the streamed
> >> > points right now), you run a flat map operator. And you mention
> >> /
> >> > "MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive
> >> events
> >> > and update its local centroids (and periodically output the centroids)
> >> and
> >> > on the other input would send centroids of other flatmaps and would
> >> merge
> >> > them to the local."
> >> /
> >> > I dont understand this part completely, if i am not wrong, you are
> >> saying
> >> > that the co flatmap function would have 2 map functions. Now i dont
> >> > understand this part .. as to what specifically am i doing in each map
> >> > function?
> >> >
> >> > 3. lastly, the updated centroids which came back from the coflatmap
> >> > function is fed back to the stream again and this is the part i get
> >> lost
> >> > again ... how is this centroid fed back and if this is fed back what
> >> > happens to the point stream? and if it does somehow is fed back, how
> do
> >> i
> >> > catch it in the coflatmap function?
> >> >
> >> >
> >> > If I understand this a bit, then in your code the first set of
> >> centroids
> >> > are created in the coflatmap function and you dont already have a list
> >> of
> >> > centroids to start with? Am i assuming it correct?
> >> >
> >> > I underwent the process of iteration in the Kmeans example from this
> >> > following link:
> >> >
> >>
> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java
> >> >
> >> > and I understand how this is working .. but i am stil not clear how ur
> >> > example is working.
> >> >
> >> > Could you please explain it a bit more? with some examples maybe?
> >> >
> >> > Thanks a lot.
> >> > Gyula Fóra-2 wrote
> >> >> Hi Biplob,
> >> >>
> >> >> I have implemented a similar algorithm as Aljoscha mentioned.
> >> >>
> >> >> First things to clarify are the following:
> >> >> There is currently no abstraction for keeping objects (in you case
> >> >> centroids) in a centralized way that can be updated/read by all
> >> >> operators.
> >> >> This would probably be very costly and is actually not necessary in
> >> your
> >> >> case.
> >> >>
> >> >> Broadcast a stream in contrast with other partitioning methods mean
> >> that
> >> >> the events will be replicated to all downstream operators. This not a
> >> >> magical operator that will make state available among parallel
> >> instances.
> >> >>
> >> >> Now let me explain what I think you want from Flink and how to do it
> >> :)
> >> >>
> >> >> You have input data stream and a set of centroids to be updated based
> >> on
> >> >> the incoming records. As you want to do this in parallel you have an
> >> >> operator (let's say a flatmap) that keeps the centroids locally and
> >> >> updates
> >> >> it on it's inputs. Now you have a set of independently updated
> >> centroids,
> >> >> so you want to merge them and update the centroids in each flatmap.
> >> >>
> >> >> Let's see how to do this. Given that you have your centroids locally,
> >> >> updating them is super easy, so I will not talk about that. The
> >> >> problematic
> >> >> part is periodically merging end "broadcasting" the centroids so all
> >> the
> >> >> flatmaps eventually see the same (they don't have to always be the
> >> same
> >> >> for
> >> >> clustering probably). There is no operator for sending state
> >> (centroids)
> >> >> between subtasks so you have to be clever here. We can actually use
> >> >> cyclic
> >> >> streams to solve this problem by sending the centroids as simple
> >> events
> >> >> to
> >> >> a CoFlatMap:
> >> >>
> >> >> DataStream
> >> >>
> > <Point>
> >> >>  input = ...
> >> >> ConnectedIterativeStreams&lt;Point, Centroids&gt; inputsAndCentroids
> =
> >> >> input.iterate().withFeedbackType(Centroids.class)
> >> >> DataStream
> >> >>
> > <Centroids>
> >> >>  updatedCentroids =
> >> >> inputsAndCentroids.flatMap(MyCoFlatmap)
> >> >> inputsAndCentroids.closeWith(updatedCentroids.broadcast())
> >> >>
> >> >> MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive
> >> events
> >> >> and update its local centroids (and periodically output the
> centroids)
> >> >> and
> >> >> on the other input would send centroids of other flatmaps and would
> >> merge
> >> >> them to the local.
> >> >>
> >> >> This might be a lot to take in at first, so you might want to read up
> >> on
> >> >> streaming iterations and connected streams before you start.
> >> >>
> >> >> Let me know if this makes sense.
> >> >>
> >> >> Cheers,
> >> >> Gyula
> >> >>
> >> >>
> >> >> Biplob Biswas &lt;
> >>
> >> >> revolutionisme@
> >>
> >> >> &gt; ezt írta (időpont: 2016. ápr. 28.,
> >> >> Cs, 14:41):
> >> >>
> >> >>> That would really be great, any example would help me proceed with
> my
> >> >>> work.
> >> >>> Thanks a lot.
> >> >>>
> >> >>>
> >> >>> Aljoscha Krettek wrote
> >> >>> > Hi Biplob,
> >> >>> > one of our developers had a stream clustering example a while
> back.
> >> It
> >> >>> was
> >> >>> > using a broadcast feedback edge with a co-operator to update the
> >> >>> > centroids.
> >> >>> > I'll directly include him in the email so that he will notice and
> >> can
> >> >>> send
> >> >>> > you the example.
> >> >>> >
> >> >>> > Cheers,
> >> >>> > Aljoscha
> >> >>> >
> >> >>> > On Thu, 28 Apr 2016 at 13:57 Biplob Biswas &lt;
> >> >>>
> >> >>> > revolutionisme@
> >> >>>
> >> >>> > &gt; wrote:
> >> >>> >
> >> >>> >> I am pretty new to flink systems, thus can anyone atleast give me
> >> an
> >> >>> >> example
> >> >>> >> of how datastream.broadcast() method works? From the
> documentation
> >> i
> >> >>> get
> >> >>> >> the
> >> >>> >> following:
> >> >>> >>
> >> >>> >> broadcast()
> >> >>> >> Sets the partitioning of the DataStream so that the output
> >> elements
> >> >>> are
> >> >>> >> broadcasted to every parallel instance of the next operation.
> >> >>> >>
> >> >>> >> If the output elements are broadcasted, then how are they
> >> retrieved?
> >> >>> Or
> >> >>> >> maybe I am looking at this method in a completely wrong way?
> >> >>> >>
> >> >>> >> Thanks
> >> >>> >> Biplob Biswas
> >> >>> >>
> >> >>> >>
> >> >>> >>
> >> >>> >> --
> >> >>> >> View this message in context:
> >> >>> >>
> >> >>>
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
> >> >>> >> Sent from the Apache Flink User Mailing List archive. mailing
> list
> >> >>> >> archive
> >> >>> >> at Nabble.com.
> >> >>> >>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>> View this message in context:
> >> >>>
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html
> >> >>> Sent from the Apache Flink User Mailing List archive. mailing list
> >> >>> archive
> >> >>> at Nabble.com.
> >> >>>
> >>
> >>
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6612.html
> >> Sent from the Apache Flink User Mailing List archive. mailing list
> >> archive
> >> at Nabble.com.
> >>
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6619.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Regarding Broadcast of datasets in streaming context

Posted by Biplob Biswas <re...@gmail.com>.
Hi Gyula,

Could you explain a bit why i wouldn't want the centroids to be collected
after every point? 

I mean, once I get a streamed point via map1 function .. i would want to
compare the distance of the point with a centroid which arrives via map2
function and i keep on comparing for every centroid which comes in
subsequently, once the update of the centroid happens shouldn't i collect
the entire set? Thus, updating a centroid and collecting it back for the
next point in the iteration.

I may not be getting the concept properly here, so an example snippet would
help in a long run. 

Thanks & Regards
Biplob
Gyula Fóra wrote
> Hey,
> 
> I think you got the good idea :)
> 
> So your coflatmap will get all the centroids that you have sent to the
> stream in the closeWith call. This means that whenever you collect a new
> set of centroids they will be iterated back. This means you don't always
> want to send the centroids out on the collector, only periodically.
> 
> The order in which these come is pretty much arbitrary so you need to make
> sure to add some logic by which you can order it if this is important.
> 
> Im not sure if this helped or not :D
> 
> Gyula
> 
> Biplob Biswas &lt;

> revolutionisme@

> &gt; ezt írta (időpont: 2016. máj. 2.,
> H, 13:13):
> 
>> Hi Gyula,
>>
>> I understand more now how this thing might work and its fascinating.
>> Although I still have one question with the coflatmap function.
>>
>> First, let me explain what I understand and whether its correct or not:
>> 1. The connected iterative stream ensures that the coflatmap function
>> receive the points and the centroids which are broadcasted on each
>> iteration
>> defined by closewith.
>>
>> 2. So in the coflatmap function, on one map I get the points and on the
>> other map function i get the centroids which are broadcasted.
>>
>> Now comes the part I am assuming a bit because I dont understand from the
>> theory.
>> 3. Assuming I can use the broadcasted centroids, I calculate the nearest
>> centroid from the streamed point and I update the centroid and only use
>> one
>> of the collectors to return the updated centroids list back.
>>
>>
>> The question here is, I am assuming that this operation is not done in
>> parallel as if streams are sent in parallel how would I ensure correct
>> update of the centroids as multiple points can try to update the same
>> centroid in parallel .
>>
>> I hope I made myself clear with this.
>>
>> Thanks and Regards
>> Biplob
>> Biplob Biswas wrote
>> > Hi Gyula,
>> >
>> > I read your workaround and started reading about flink iterations,
>> > coflatmap operators and other things. Now, I do understand a few things
>> > but the solution you provided is not completely clear to me.
>> >
>> > I understand the following things from your post.
>> > 1. You initially have a datastream of points, on which you iterate and
>> the
>> > 'withFeedbackType' defines the type of the connected stream so rather
>> than
>> > "Points" the type is  "Centroids" now.
>> >
>> > 2.On this connected stream (which I understand, only have the streamed
>> > points right now), you run a flat map operator. And you mention
>> /
>> > "MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive
>> events
>> > and update its local centroids (and periodically output the centroids)
>> and
>> > on the other input would send centroids of other flatmaps and would
>> merge
>> > them to the local."
>> /
>> > I dont understand this part completely, if i am not wrong, you are
>> saying
>> > that the co flatmap function would have 2 map functions. Now i dont
>> > understand this part .. as to what specifically am i doing in each map
>> > function?
>> >
>> > 3. lastly, the updated centroids which came back from the coflatmap
>> > function is fed back to the stream again and this is the part i get
>> lost
>> > again ... how is this centroid fed back and if this is fed back what
>> > happens to the point stream? and if it does somehow is fed back, how do
>> i
>> > catch it in the coflatmap function?
>> >
>> >
>> > If I understand this a bit, then in your code the first set of
>> centroids
>> > are created in the coflatmap function and you dont already have a list
>> of
>> > centroids to start with? Am i assuming it correct?
>> >
>> > I underwent the process of iteration in the Kmeans example from this
>> > following link:
>> >
>> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java
>> >
>> > and I understand how this is working .. but i am stil not clear how ur
>> > example is working.
>> >
>> > Could you please explain it a bit more? with some examples maybe?
>> >
>> > Thanks a lot.
>> > Gyula Fóra-2 wrote
>> >> Hi Biplob,
>> >>
>> >> I have implemented a similar algorithm as Aljoscha mentioned.
>> >>
>> >> First things to clarify are the following:
>> >> There is currently no abstraction for keeping objects (in you case
>> >> centroids) in a centralized way that can be updated/read by all
>> >> operators.
>> >> This would probably be very costly and is actually not necessary in
>> your
>> >> case.
>> >>
>> >> Broadcast a stream in contrast with other partitioning methods mean
>> that
>> >> the events will be replicated to all downstream operators. This not a
>> >> magical operator that will make state available among parallel
>> instances.
>> >>
>> >> Now let me explain what I think you want from Flink and how to do it
>> :)
>> >>
>> >> You have input data stream and a set of centroids to be updated based
>> on
>> >> the incoming records. As you want to do this in parallel you have an
>> >> operator (let's say a flatmap) that keeps the centroids locally and
>> >> updates
>> >> it on it's inputs. Now you have a set of independently updated
>> centroids,
>> >> so you want to merge them and update the centroids in each flatmap.
>> >>
>> >> Let's see how to do this. Given that you have your centroids locally,
>> >> updating them is super easy, so I will not talk about that. The
>> >> problematic
>> >> part is periodically merging end "broadcasting" the centroids so all
>> the
>> >> flatmaps eventually see the same (they don't have to always be the
>> same
>> >> for
>> >> clustering probably). There is no operator for sending state
>> (centroids)
>> >> between subtasks so you have to be clever here. We can actually use
>> >> cyclic
>> >> streams to solve this problem by sending the centroids as simple
>> events
>> >> to
>> >> a CoFlatMap:
>> >>
>> >> DataStream
>> >> 
> <Point>
>> >>  input = ...
>> >> ConnectedIterativeStreams&lt;Point, Centroids&gt; inputsAndCentroids =
>> >> input.iterate().withFeedbackType(Centroids.class)
>> >> DataStream
>> >> 
> <Centroids>
>> >>  updatedCentroids =
>> >> inputsAndCentroids.flatMap(MyCoFlatmap)
>> >> inputsAndCentroids.closeWith(updatedCentroids.broadcast())
>> >>
>> >> MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive
>> events
>> >> and update its local centroids (and periodically output the centroids)
>> >> and
>> >> on the other input would send centroids of other flatmaps and would
>> merge
>> >> them to the local.
>> >>
>> >> This might be a lot to take in at first, so you might want to read up
>> on
>> >> streaming iterations and connected streams before you start.
>> >>
>> >> Let me know if this makes sense.
>> >>
>> >> Cheers,
>> >> Gyula
>> >>
>> >>
>> >> Biplob Biswas &lt;
>>
>> >> revolutionisme@
>>
>> >> &gt; ezt írta (időpont: 2016. ápr. 28.,
>> >> Cs, 14:41):
>> >>
>> >>> That would really be great, any example would help me proceed with my
>> >>> work.
>> >>> Thanks a lot.
>> >>>
>> >>>
>> >>> Aljoscha Krettek wrote
>> >>> > Hi Biplob,
>> >>> > one of our developers had a stream clustering example a while back.
>> It
>> >>> was
>> >>> > using a broadcast feedback edge with a co-operator to update the
>> >>> > centroids.
>> >>> > I'll directly include him in the email so that he will notice and
>> can
>> >>> send
>> >>> > you the example.
>> >>> >
>> >>> > Cheers,
>> >>> > Aljoscha
>> >>> >
>> >>> > On Thu, 28 Apr 2016 at 13:57 Biplob Biswas &lt;
>> >>>
>> >>> > revolutionisme@
>> >>>
>> >>> > &gt; wrote:
>> >>> >
>> >>> >> I am pretty new to flink systems, thus can anyone atleast give me
>> an
>> >>> >> example
>> >>> >> of how datastream.broadcast() method works? From the documentation
>> i
>> >>> get
>> >>> >> the
>> >>> >> following:
>> >>> >>
>> >>> >> broadcast()
>> >>> >> Sets the partitioning of the DataStream so that the output
>> elements
>> >>> are
>> >>> >> broadcasted to every parallel instance of the next operation.
>> >>> >>
>> >>> >> If the output elements are broadcasted, then how are they
>> retrieved?
>> >>> Or
>> >>> >> maybe I am looking at this method in a completely wrong way?
>> >>> >>
>> >>> >> Thanks
>> >>> >> Biplob Biswas
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >> --
>> >>> >> View this message in context:
>> >>> >>
>> >>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
>> >>> >> Sent from the Apache Flink User Mailing List archive. mailing list
>> >>> >> archive
>> >>> >> at Nabble.com.
>> >>> >>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> View this message in context:
>> >>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html
>> >>> Sent from the Apache Flink User Mailing List archive. mailing list
>> >>> archive
>> >>> at Nabble.com.
>> >>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6612.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> at Nabble.com.
>>





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6619.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Regarding Broadcast of datasets in streaming context

Posted by Gyula Fóra <gy...@gmail.com>.
Hey,

I think you got the good idea :)

So your coflatmap will get all the centroids that you have sent to the
stream in the closeWith call. This means that whenever you collect a new
set of centroids they will be iterated back. This means you don't always
want to send the centroids out on the collector, only periodically.

The order in which these come is pretty much arbitrary so you need to make
sure to add some logic by which you can order it if this is important.

Im not sure if this helped or not :D

Gyula

Biplob Biswas <re...@gmail.com> ezt írta (időpont: 2016. máj. 2.,
H, 13:13):

> Hi Gyula,
>
> I understand more now how this thing might work and its fascinating.
> Although I still have one question with the coflatmap function.
>
> First, let me explain what I understand and whether its correct or not:
> 1. The connected iterative stream ensures that the coflatmap function
> receive the points and the centroids which are broadcasted on each
> iteration
> defined by closewith.
>
> 2. So in the coflatmap function, on one map I get the points and on the
> other map function i get the centroids which are broadcasted.
>
> Now comes the part I am assuming a bit because I dont understand from the
> theory.
> 3. Assuming I can use the broadcasted centroids, I calculate the nearest
> centroid from the streamed point and I update the centroid and only use one
> of the collectors to return the updated centroids list back.
>
>
> The question here is, I am assuming that this operation is not done in
> parallel as if streams are sent in parallel how would I ensure correct
> update of the centroids as multiple points can try to update the same
> centroid in parallel .
>
> I hope I made myself clear with this.
>
> Thanks and Regards
> Biplob
> Biplob Biswas wrote
> > Hi Gyula,
> >
> > I read your workaround and started reading about flink iterations,
> > coflatmap operators and other things. Now, I do understand a few things
> > but the solution you provided is not completely clear to me.
> >
> > I understand the following things from your post.
> > 1. You initially have a datastream of points, on which you iterate and
> the
> > 'withFeedbackType' defines the type of the connected stream so rather
> than
> > "Points" the type is  "Centroids" now.
> >
> > 2.On this connected stream (which I understand, only have the streamed
> > points right now), you run a flat map operator. And you mention
> /
> > "MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events
> > and update its local centroids (and periodically output the centroids)
> and
> > on the other input would send centroids of other flatmaps and would merge
> > them to the local."
> /
> > I dont understand this part completely, if i am not wrong, you are saying
> > that the co flatmap function would have 2 map functions. Now i dont
> > understand this part .. as to what specifically am i doing in each map
> > function?
> >
> > 3. lastly, the updated centroids which came back from the coflatmap
> > function is fed back to the stream again and this is the part i get lost
> > again ... how is this centroid fed back and if this is fed back what
> > happens to the point stream? and if it does somehow is fed back, how do i
> > catch it in the coflatmap function?
> >
> >
> > If I understand this a bit, then in your code the first set of centroids
> > are created in the coflatmap function and you dont already have a list of
> > centroids to start with? Am i assuming it correct?
> >
> > I underwent the process of iteration in the Kmeans example from this
> > following link:
> >
> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java
> >
> > and I understand how this is working .. but i am stil not clear how ur
> > example is working.
> >
> > Could you please explain it a bit more? with some examples maybe?
> >
> > Thanks a lot.
> > Gyula Fóra-2 wrote
> >> Hi Biplob,
> >>
> >> I have implemented a similar algorithm as Aljoscha mentioned.
> >>
> >> First things to clarify are the following:
> >> There is currently no abstraction for keeping objects (in you case
> >> centroids) in a centralized way that can be updated/read by all
> >> operators.
> >> This would probably be very costly and is actually not necessary in your
> >> case.
> >>
> >> Broadcast a stream in contrast with other partitioning methods mean that
> >> the events will be replicated to all downstream operators. This not a
> >> magical operator that will make state available among parallel
> instances.
> >>
> >> Now let me explain what I think you want from Flink and how to do it :)
> >>
> >> You have input data stream and a set of centroids to be updated based on
> >> the incoming records. As you want to do this in parallel you have an
> >> operator (let's say a flatmap) that keeps the centroids locally and
> >> updates
> >> it on it's inputs. Now you have a set of independently updated
> centroids,
> >> so you want to merge them and update the centroids in each flatmap.
> >>
> >> Let's see how to do this. Given that you have your centroids locally,
> >> updating them is super easy, so I will not talk about that. The
> >> problematic
> >> part is periodically merging end "broadcasting" the centroids so all the
> >> flatmaps eventually see the same (they don't have to always be the same
> >> for
> >> clustering probably). There is no operator for sending state (centroids)
> >> between subtasks so you have to be clever here. We can actually use
> >> cyclic
> >> streams to solve this problem by sending the centroids as simple events
> >> to
> >> a CoFlatMap:
> >>
> >> DataStream
> >> <Point>
> >>  input = ...
> >> ConnectedIterativeStreams&lt;Point, Centroids&gt; inputsAndCentroids =
> >> input.iterate().withFeedbackType(Centroids.class)
> >> DataStream
> >> <Centroids>
> >>  updatedCentroids =
> >> inputsAndCentroids.flatMap(MyCoFlatmap)
> >> inputsAndCentroids.closeWith(updatedCentroids.broadcast())
> >>
> >> MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events
> >> and update its local centroids (and periodically output the centroids)
> >> and
> >> on the other input would send centroids of other flatmaps and would
> merge
> >> them to the local.
> >>
> >> This might be a lot to take in at first, so you might want to read up on
> >> streaming iterations and connected streams before you start.
> >>
> >> Let me know if this makes sense.
> >>
> >> Cheers,
> >> Gyula
> >>
> >>
> >> Biplob Biswas &lt;
>
> >> revolutionisme@
>
> >> &gt; ezt írta (időpont: 2016. ápr. 28.,
> >> Cs, 14:41):
> >>
> >>> That would really be great, any example would help me proceed with my
> >>> work.
> >>> Thanks a lot.
> >>>
> >>>
> >>> Aljoscha Krettek wrote
> >>> > Hi Biplob,
> >>> > one of our developers had a stream clustering example a while back.
> It
> >>> was
> >>> > using a broadcast feedback edge with a co-operator to update the
> >>> > centroids.
> >>> > I'll directly include him in the email so that he will notice and can
> >>> send
> >>> > you the example.
> >>> >
> >>> > Cheers,
> >>> > Aljoscha
> >>> >
> >>> > On Thu, 28 Apr 2016 at 13:57 Biplob Biswas &lt;
> >>>
> >>> > revolutionisme@
> >>>
> >>> > &gt; wrote:
> >>> >
> >>> >> I am pretty new to flink systems, thus can anyone atleast give me an
> >>> >> example
> >>> >> of how datastream.broadcast() method works? From the documentation i
> >>> get
> >>> >> the
> >>> >> following:
> >>> >>
> >>> >> broadcast()
> >>> >> Sets the partitioning of the DataStream so that the output elements
> >>> are
> >>> >> broadcasted to every parallel instance of the next operation.
> >>> >>
> >>> >> If the output elements are broadcasted, then how are they retrieved?
> >>> Or
> >>> >> maybe I am looking at this method in a completely wrong way?
> >>> >>
> >>> >> Thanks
> >>> >> Biplob Biswas
> >>> >>
> >>> >>
> >>> >>
> >>> >> --
> >>> >> View this message in context:
> >>> >>
> >>>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
> >>> >> Sent from the Apache Flink User Mailing List archive. mailing list
> >>> >> archive
> >>> >> at Nabble.com.
> >>> >>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>> View this message in context:
> >>>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html
> >>> Sent from the Apache Flink User Mailing List archive. mailing list
> >>> archive
> >>> at Nabble.com.
> >>>
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6612.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Regarding Broadcast of datasets in streaming context

Posted by Biplob Biswas <re...@gmail.com>.
Hi Gyula,

I understand more now how this thing might work and its fascinating.
Although I still have one question with the coflatmap function.

First, let me explain what I understand and whether its correct or not: 
1. The connected iterative stream ensures that the coflatmap function
receive the points and the centroids which are broadcasted on each iteration
defined by closewith.

2. So in the coflatmap function, on one map I get the points and on the
other map function i get the centroids which are broadcasted.

Now comes the part I am assuming a bit because I dont understand from the
theory.
3. Assuming I can use the broadcasted centroids, I calculate the nearest
centroid from the streamed point and I update the centroid and only use one
of the collectors to return the updated centroids list back.


The question here is, I am assuming that this operation is not done in
parallel as if streams are sent in parallel how would I ensure correct
update of the centroids as multiple points can try to update the same
centroid in parallel .

I hope I made myself clear with this.

Thanks and Regards
Biplob
Biplob Biswas wrote
> Hi Gyula,
> 
> I read your workaround and started reading about flink iterations,
> coflatmap operators and other things. Now, I do understand a few things
> but the solution you provided is not completely clear to me.
> 
> I understand the following things from your post.
> 1. You initially have a datastream of points, on which you iterate and the
> 'withFeedbackType' defines the type of the connected stream so rather than
> "Points" the type is  "Centroids" now.
> 
> 2.On this connected stream (which I understand, only have the streamed
> points right now), you run a flat map operator. And you mention 
/
> "MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events
> and update its local centroids (and periodically output the centroids) and
> on the other input would send centroids of other flatmaps and would merge
> them to the local."
/
> I dont understand this part completely, if i am not wrong, you are saying
> that the co flatmap function would have 2 map functions. Now i dont
> understand this part .. as to what specifically am i doing in each map
> function?
> 
> 3. lastly, the updated centroids which came back from the coflatmap
> function is fed back to the stream again and this is the part i get lost
> again ... how is this centroid fed back and if this is fed back what
> happens to the point stream? and if it does somehow is fed back, how do i
> catch it in the coflatmap function? 
> 
> 
> If I understand this a bit, then in your code the first set of centroids
> are created in the coflatmap function and you dont already have a list of
> centroids to start with? Am i assuming it correct?
> 
> I underwent the process of iteration in the Kmeans example from this
> following link:
> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java
> 
> and I understand how this is working .. but i am stil not clear how ur
> example is working. 
> 
> Could you please explain it a bit more? with some examples maybe?
> 
> Thanks a lot.
> Gyula Fóra-2 wrote
>> Hi Biplob,
>> 
>> I have implemented a similar algorithm as Aljoscha mentioned.
>> 
>> First things to clarify are the following:
>> There is currently no abstraction for keeping objects (in you case
>> centroids) in a centralized way that can be updated/read by all
>> operators.
>> This would probably be very costly and is actually not necessary in your
>> case.
>> 
>> Broadcast a stream in contrast with other partitioning methods mean that
>> the events will be replicated to all downstream operators. This not a
>> magical operator that will make state available among parallel instances.
>> 
>> Now let me explain what I think you want from Flink and how to do it :)
>> 
>> You have input data stream and a set of centroids to be updated based on
>> the incoming records. As you want to do this in parallel you have an
>> operator (let's say a flatmap) that keeps the centroids locally and
>> updates
>> it on it's inputs. Now you have a set of independently updated centroids,
>> so you want to merge them and update the centroids in each flatmap.
>> 
>> Let's see how to do this. Given that you have your centroids locally,
>> updating them is super easy, so I will not talk about that. The
>> problematic
>> part is periodically merging end "broadcasting" the centroids so all the
>> flatmaps eventually see the same (they don't have to always be the same
>> for
>> clustering probably). There is no operator for sending state (centroids)
>> between subtasks so you have to be clever here. We can actually use
>> cyclic
>> streams to solve this problem by sending the centroids as simple events
>> to
>> a CoFlatMap:
>> 
>> DataStream
>> <Point>
>>  input = ...
>> ConnectedIterativeStreams&lt;Point, Centroids&gt; inputsAndCentroids =
>> input.iterate().withFeedbackType(Centroids.class)
>> DataStream
>> <Centroids>
>>  updatedCentroids =
>> inputsAndCentroids.flatMap(MyCoFlatmap)
>> inputsAndCentroids.closeWith(updatedCentroids.broadcast())
>> 
>> MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events
>> and update its local centroids (and periodically output the centroids)
>> and
>> on the other input would send centroids of other flatmaps and would merge
>> them to the local.
>> 
>> This might be a lot to take in at first, so you might want to read up on
>> streaming iterations and connected streams before you start.
>> 
>> Let me know if this makes sense.
>> 
>> Cheers,
>> Gyula
>> 
>> 
>> Biplob Biswas &lt;

>> revolutionisme@

>> &gt; ezt írta (időpont: 2016. ápr. 28.,
>> Cs, 14:41):
>> 
>>> That would really be great, any example would help me proceed with my
>>> work.
>>> Thanks a lot.
>>>
>>>
>>> Aljoscha Krettek wrote
>>> > Hi Biplob,
>>> > one of our developers had a stream clustering example a while back. It
>>> was
>>> > using a broadcast feedback edge with a co-operator to update the
>>> > centroids.
>>> > I'll directly include him in the email so that he will notice and can
>>> send
>>> > you the example.
>>> >
>>> > Cheers,
>>> > Aljoscha
>>> >
>>> > On Thu, 28 Apr 2016 at 13:57 Biplob Biswas &lt;
>>>
>>> > revolutionisme@
>>>
>>> > &gt; wrote:
>>> >
>>> >> I am pretty new to flink systems, thus can anyone atleast give me an
>>> >> example
>>> >> of how datastream.broadcast() method works? From the documentation i
>>> get
>>> >> the
>>> >> following:
>>> >>
>>> >> broadcast()
>>> >> Sets the partitioning of the DataStream so that the output elements
>>> are
>>> >> broadcasted to every parallel instance of the next operation.
>>> >>
>>> >> If the output elements are broadcasted, then how are they retrieved?
>>> Or
>>> >> maybe I am looking at this method in a completely wrong way?
>>> >>
>>> >> Thanks
>>> >> Biplob Biswas
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> View this message in context:
>>> >>
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
>>> >> Sent from the Apache Flink User Mailing List archive. mailing list
>>> >> archive
>>> >> at Nabble.com.
>>> >>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive
>>> at Nabble.com.
>>>





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6612.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Regarding Broadcast of datasets in streaming context

Posted by Biplob Biswas <re...@gmail.com>.
Hi Gyula,

I read your workaround and started reading about flink iterations, coflatmap
operators and other things. Now, I do understand a few things but the
solution you provided is not completely clear to me.

I understand the following things from your post.
1. You initially have a datastream of points, on which you iterate and the
'withFeedbackType' defines the type of the connected stream so rather than
"Points" the type is  "Centroids" now.

2.On this connected stream (which I understand, only have the streamed
points right now), you run a flat map operator. And you mention 
/"MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events
and update its local centroids (and periodically output the centroids) and
on the other input would send centroids of other flatmaps and would merge
them to the local."/
I dont understand this part completely, if i am not wrong, you are saying
that the co flatmap function would have 2 map functions. Now i dont
understand this part .. as to what specifically am i doing in each map
function?

3. lastly, the updated centroids which came back from the coflatmap function
is fed back to the stream again and this is the part i get lost again ...
how is this centroid fed back and if this is fed back what happens to the
point stream? and if it does somehow is fed back, how do i catch it in the
coflatmap function? 


If I understand this a bit, then in your code the first set of centroids are
created in the coflatmap function and you dont already have a list of
centroids to start with? Am i assuming it correct?

I underwent the process of iteration in the Kmeans example from this
following link:
https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java

and I understand how this is working .. but i am stil not clear how ur
example is working. 

Could you please explain it a bit more? with some examples maybe?

Thanks a lot.


Gyula Fóra-2 wrote
> Hi Biplob,
> 
> I have implemented a similar algorithm as Aljoscha mentioned.
> 
> First things to clarify are the following:
> There is currently no abstraction for keeping objects (in you case
> centroids) in a centralized way that can be updated/read by all operators.
> This would probably be very costly and is actually not necessary in your
> case.
> 
> Broadcast a stream in contrast with other partitioning methods mean that
> the events will be replicated to all downstream operators. This not a
> magical operator that will make state available among parallel instances.
> 
> Now let me explain what I think you want from Flink and how to do it :)
> 
> You have input data stream and a set of centroids to be updated based on
> the incoming records. As you want to do this in parallel you have an
> operator (let's say a flatmap) that keeps the centroids locally and
> updates
> it on it's inputs. Now you have a set of independently updated centroids,
> so you want to merge them and update the centroids in each flatmap.
> 
> Let's see how to do this. Given that you have your centroids locally,
> updating them is super easy, so I will not talk about that. The
> problematic
> part is periodically merging end "broadcasting" the centroids so all the
> flatmaps eventually see the same (they don't have to always be the same
> for
> clustering probably). There is no operator for sending state (centroids)
> between subtasks so you have to be clever here. We can actually use cyclic
> streams to solve this problem by sending the centroids as simple events to
> a CoFlatMap:
> 
> DataStream
> <Point>
>  input = ...
> ConnectedIterativeStreams&lt;Point, Centroids&gt; inputsAndCentroids =
> input.iterate().withFeedbackType(Centroids.class)
> DataStream
> <Centroids>
>  updatedCentroids =
> inputsAndCentroids.flatMap(MyCoFlatmap)
> inputsAndCentroids.closeWith(updatedCentroids.broadcast())
> 
> MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events
> and update its local centroids (and periodically output the centroids) and
> on the other input would send centroids of other flatmaps and would merge
> them to the local.
> 
> This might be a lot to take in at first, so you might want to read up on
> streaming iterations and connected streams before you start.
> 
> Let me know if this makes sense.
> 
> Cheers,
> Gyula
> 
> 
> Biplob Biswas &lt;

> revolutionisme@

> &gt; ezt írta (időpont: 2016. ápr. 28.,
> Cs, 14:41):
> 
>> That would really be great, any example would help me proceed with my
>> work.
>> Thanks a lot.
>>
>>
>> Aljoscha Krettek wrote
>> > Hi Biplob,
>> > one of our developers had a stream clustering example a while back. It
>> was
>> > using a broadcast feedback edge with a co-operator to update the
>> > centroids.
>> > I'll directly include him in the email so that he will notice and can
>> send
>> > you the example.
>> >
>> > Cheers,
>> > Aljoscha
>> >
>> > On Thu, 28 Apr 2016 at 13:57 Biplob Biswas &lt;
>>
>> > revolutionisme@
>>
>> > &gt; wrote:
>> >
>> >> I am pretty new to flink systems, thus can anyone atleast give me an
>> >> example
>> >> of how datastream.broadcast() method works? From the documentation i
>> get
>> >> the
>> >> following:
>> >>
>> >> broadcast()
>> >> Sets the partitioning of the DataStream so that the output elements
>> are
>> >> broadcasted to every parallel instance of the next operation.
>> >>
>> >> If the output elements are broadcasted, then how are they retrieved?
>> Or
>> >> maybe I am looking at this method in a completely wrong way?
>> >>
>> >> Thanks
>> >> Biplob Biswas
>> >>
>> >>
>> >>
>> >> --
>> >> View this message in context:
>> >>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
>> >> Sent from the Apache Flink User Mailing List archive. mailing list
>> >> archive
>> >> at Nabble.com.
>> >>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> at Nabble.com.
>>





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6590.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Regarding Broadcast of datasets in streaming context

Posted by Gyula Fóra <gy...@apache.org>.
Hi Biplob,

I have implemented a similar algorithm as Aljoscha mentioned.

First things to clarify are the following:
There is currently no abstraction for keeping objects (in you case
centroids) in a centralized way that can be updated/read by all operators.
This would probably be very costly and is actually not necessary in your
case.

Broadcast a stream in contrast with other partitioning methods mean that
the events will be replicated to all downstream operators. This not a
magical operator that will make state available among parallel instances.

Now let me explain what I think you want from Flink and how to do it :)

You have input data stream and a set of centroids to be updated based on
the incoming records. As you want to do this in parallel you have an
operator (let's say a flatmap) that keeps the centroids locally and updates
it on it's inputs. Now you have a set of independently updated centroids,
so you want to merge them and update the centroids in each flatmap.

Let's see how to do this. Given that you have your centroids locally,
updating them is super easy, so I will not talk about that. The problematic
part is periodically merging end "broadcasting" the centroids so all the
flatmaps eventually see the same (they don't have to always be the same for
clustering probably). There is no operator for sending state (centroids)
between subtasks so you have to be clever here. We can actually use cyclic
streams to solve this problem by sending the centroids as simple events to
a CoFlatMap:

DataStream<Point> input = ...
ConnectedIterativeStreams<Point, Centroids> inputsAndCentroids =
input.iterate().withFeedbackType(Centroids.class)
DataStream<Centroids> updatedCentroids =
inputsAndCentroids.flatMap(MyCoFlatmap)
inputsAndCentroids.closeWith(updatedCentroids.broadcast())

MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events
and update its local centroids (and periodically output the centroids) and
on the other input would send centroids of other flatmaps and would merge
them to the local.

This might be a lot to take in at first, so you might want to read up on
streaming iterations and connected streams before you start.

Let me know if this makes sense.

Cheers,
Gyula


Biplob Biswas <re...@gmail.com> ezt írta (időpont: 2016. ápr. 28.,
Cs, 14:41):

> That would really be great, any example would help me proceed with my work.
> Thanks a lot.
>
>
> Aljoscha Krettek wrote
> > Hi Biplob,
> > one of our developers had a stream clustering example a while back. It
> was
> > using a broadcast feedback edge with a co-operator to update the
> > centroids.
> > I'll directly include him in the email so that he will notice and can
> send
> > you the example.
> >
> > Cheers,
> > Aljoscha
> >
> > On Thu, 28 Apr 2016 at 13:57 Biplob Biswas &lt;
>
> > revolutionisme@
>
> > &gt; wrote:
> >
> >> I am pretty new to flink systems, thus can anyone atleast give me an
> >> example
> >> of how datastream.broadcast() method works? From the documentation i get
> >> the
> >> following:
> >>
> >> broadcast()
> >> Sets the partitioning of the DataStream so that the output elements are
> >> broadcasted to every parallel instance of the next operation.
> >>
> >> If the output elements are broadcasted, then how are they retrieved? Or
> >> maybe I am looking at this method in a completely wrong way?
> >>
> >> Thanks
> >> Biplob Biswas
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
> >> Sent from the Apache Flink User Mailing List archive. mailing list
> >> archive
> >> at Nabble.com.
> >>
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Regarding Broadcast of datasets in streaming context

Posted by Biplob Biswas <re...@gmail.com>.
That would really be great, any example would help me proceed with my work.
Thanks a lot.


Aljoscha Krettek wrote
> Hi Biplob,
> one of our developers had a stream clustering example a while back. It was
> using a broadcast feedback edge with a co-operator to update the
> centroids.
> I'll directly include him in the email so that he will notice and can send
> you the example.
> 
> Cheers,
> Aljoscha
> 
> On Thu, 28 Apr 2016 at 13:57 Biplob Biswas &lt;

> revolutionisme@

> &gt; wrote:
> 
>> I am pretty new to flink systems, thus can anyone atleast give me an
>> example
>> of how datastream.broadcast() method works? From the documentation i get
>> the
>> following:
>>
>> broadcast()
>> Sets the partitioning of the DataStream so that the output elements are
>> broadcasted to every parallel instance of the next operation.
>>
>> If the output elements are broadcasted, then how are they retrieved? Or
>> maybe I am looking at this method in a completely wrong way?
>>
>> Thanks
>> Biplob Biswas
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> at Nabble.com.
>>





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Regarding Broadcast of datasets in streaming context

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Biplob,
one of our developers had a stream clustering example a while back. It was
using a broadcast feedback edge with a co-operator to update the centroids.
I'll directly include him in the email so that he will notice and can send
you the example.

Cheers,
Aljoscha

On Thu, 28 Apr 2016 at 13:57 Biplob Biswas <re...@gmail.com> wrote:

> I am pretty new to flink systems, thus can anyone atleast give me an
> example
> of how datastream.broadcast() method works? From the documentation i get
> the
> following:
>
> broadcast()
> Sets the partitioning of the DataStream so that the output elements are
> broadcasted to every parallel instance of the next operation.
>
> If the output elements are broadcasted, then how are they retrieved? Or
> maybe I am looking at this method in a completely wrong way?
>
> Thanks
> Biplob Biswas
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Regarding Broadcast of datasets in streaming context

Posted by Biplob Biswas <re...@gmail.com>.
I am pretty new to flink systems, thus can anyone atleast give me an example
of how datastream.broadcast() method works? From the documentation i get the
following:

broadcast()
Sets the partitioning of the DataStream so that the output elements are
broadcasted to every parallel instance of the next operation.

If the output elements are broadcasted, then how are they retrieved? Or
maybe I am looking at this method in a completely wrong way? 

Thanks 
Biplob Biswas



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.