You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Biplob Biswas <re...@gmail.com> on 2016/05/12 09:17:11 UTC

Unexpected behaviour in datastream.broadcast()

Hi,

I am running this following sample code to understand how iteration and
broadcast works in streaming context.

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
		 env.setParallelism(4);
		 long i = 5;
		 DataStream<Long> mainInput = env.generateSequence(2, 8);
		 DataStream<Long> initialIterateInput = env.fromElements(i);
	
	
		IterativeStream.ConnectedIterativeStreams<Long, Long> iteration =
		       
mainInput.iterate().withFeedbackType(BasicTypeInfo.LONG_TYPE_INFO);
		 
	
		DataStream<Long> iterateHead = iteration
		        .flatMap(new CoFlatMapFunction<Long, Long, Long>() {
		            long globalVal = 1;
		        	@Override
		            public void flatMap1(Long value, Collector<Long> out) throws
Exception {
		                Thread.sleep(3000);
		                System.out.println("SEEING FROM INPUT 1: " + value+",
"+globalVal);
		                //globalVal = globalVal + value;
		                out.collect(globalVal+value);
		            }
	
		            @Override
		            public void flatMap2(Long value, Collector<Long> out) throws
Exception {
		                Thread.sleep(1000);
		                globalVal = value;
		                System.out.println("SEEING FROM INPUT 2: " + value+",
"+globalVal);
	
		                //out.collect(value);
	
		            }
		        });
	
		iteration.closeWith(iterateHead.broadcast());
	
		iterateHead.map(new MapFunction<Long, Long>() {
		    @Override
		    public Long map(Long value) throws Exception {
		        System.out.println("SEEING OUTPUT FROM ITERATION: " + value);
		        return value;
		    }
		});

I was expecting that after  out.collect(globalVal+value); is called the
value would be broadcasted to every partition as given by the closewith
statement. Also, i was expecting to get the broadcasted value to the
flatmap2 function and then update the globalval in every partition. 
But  rather than that, the values are not broadcasted and iterated properly
as i was expecting and i am getting the following output,

SEEING FROM INPUT 1: 2, 1
SEEING OUTPUT FROM ITERATION: 3
SEEING FROM INPUT 1: 3, 1
SEEING OUTPUT FROM ITERATION: 4
SEEING FROM INPUT 1: 4, 1
SEEING FROM INPUT 1: 5, 1
SEEING OUTPUT FROM ITERATION: 5
SEEING OUTPUT FROM ITERATION: 6
SEEING FROM INPUT 2: 5, 5
SEEING FROM INPUT 2: 6, 6
SEEING FROM INPUT 1: 6, 1
SEEING OUTPUT FROM ITERATION: 7
SEEING FROM INPUT 1: 7, 1
SEEING OUTPUT FROM ITERATION: 8
SEEING FROM INPUT 1: 8, 1
SEEING OUTPUT FROM ITERATION: 9
SEEING FROM INPUT 2: 4, 4
SEEING FROM INPUT 2: 4, 4
SEEING FROM INPUT 2: 5, 5
SEEING FROM INPUT 2: 5, 5
SEEING FROM INPUT 2: 3, 3
SEEING FROM INPUT 2: 6, 6
SEEING FROM INPUT 2: 6, 6
SEEING FROM INPUT 2: 6, 6
SEEING FROM INPUT 2: 9, 9
SEEING FROM INPUT 2: 3, 3
SEEING FROM INPUT 2: 4, 4
SEEING FROM INPUT 2: 4, 4
SEEING FROM INPUT 2: 8, 8
SEEING FROM INPUT 2: 5, 5
SEEING FROM INPUT 2: 3, 3
SEEING FROM INPUT 2: 3, 3
SEEING FROM INPUT 2: 7, 7
SEEING FROM INPUT 2: 9, 9
SEEING FROM INPUT 2: 9, 9
SEEING FROM INPUT 2: 9, 9
SEEING FROM INPUT 2: 8, 8
SEEING FROM INPUT 2: 8, 8
SEEING FROM INPUT 2: 8, 8
SEEING FROM INPUT 2: 7, 7
SEEING FROM INPUT 2: 7, 7
SEEING FROM INPUT 2: 7, 7


Can anyone please explain why such behaviour? Why is the iteration happening
after reading all the elements of the first input stream? what if it is an
infinite stream, would the iteration wait for it to finish? 

Thanks and Regards



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Unexpected-behaviour-in-datastream-broadcast-tp6848.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Unexpected behaviour in datastream.broadcast()

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
there is no guarantee on the order in which the elements are processed. So
it can happen that most elements from input one get processed before
elements from the feedback get processed. In case of an infinite first
input this will not happen, of of course.

For understanding what's going on it might also be helpful to print the
parallel subtask index of the operator where you are printing output. For
that you have to use a RichCoFlatMapFunction. In there, you can use
getRuntimeContext().getSubtaskIndex() to know which parallel instance an
operator is.

Cheers,
Aljoscha

On Sat, 14 May 2016 at 19:55 Biplob Biswas <re...@gmail.com> wrote:

> Can anyone help me understand how the out.collect() and the corresponding
> broadcast) is working?
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Unexpected-behaviour-in-datastream-broadcast-tp6848p6925.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Unexpected behaviour in datastream.broadcast()

Posted by Biplob Biswas <re...@gmail.com>.
Can anyone help me understand how the out.collect() and the corresponding
broadcast) is working?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Unexpected-behaviour-in-datastream-broadcast-tp6848p6925.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.