You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2015/08/03 13:57:04 UTC

[jira] [Commented] (FLINK-2470) Stream Iteration can Hang after some Data

    [ https://issues.apache.org/jira/browse/FLINK-2470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14651780#comment-14651780 ] 

Aljoscha Krettek commented on FLINK-2470:
-----------------------------------------

This is the code I was using. Notice that the loop hangs sooner if you feed in elements on the 9999 socket stream (which is broadcast). Also, when activating the other feedback stream and deactivating the broadcast feedback stream I didn't manage to make it hang.

{code}
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

import java.util.Collections;

public class IterationTest {

	public static void main(String[] args) throws Exception {

		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(4);

		DataStream<String> text = env.socketTextStream("localhost", 9999).broadcast();
		DataStream<String> text2 = env.socketTextStream("localhost", 9998).shuffle();
//		DataStream<String> union = text.union(text2);

		SplitDataStream<String> split = text.union(text2).split(new OutputSelector<String>() {
			private static final long serialVersionUID = 1L;

			@Override
			public Iterable<String> select(String value) {
				if (value.startsWith("a")) {
					System.out.println("SELECT TO A");
					return Collections.singleton("a");
				} else if (value.startsWith("b")) {
					System.out.println("SELECT TO B");
					return Collections.singleton("b");
				} else {
					System.out.println("SELECT TO C");
					return Collections.singleton("c");
				}
			}
		});


		split.select("b").map(new MapFunction<String, String>() {
			private static final long serialVersionUID = 1L;
			@Override
			public String map(String value) throws Exception {
				System.out.println("MAP B: " + value);
				return value;
			}
		}).print();

		split.select("c").map(new MapFunction<String, String>() {
			private static final long serialVersionUID = 1L;
			@Override
			public String map(String value) throws Exception {
				System.out.println("MAP C: " + value);
				return value;
			}
		}).print();

		IterativeDataStream<Tuple2<String, Integer>> iteration = split.select("a").map(new MapFunction<String, String>() {
			private static final long serialVersionUID = 1L;

			@Override
			public String map(String value) throws Exception {
				System.out.println("MAP A: " + value);
				return value;
			}
		})
				.map(new MapFunction<String, Tuple2<String, Integer>>() {
					private static final long serialVersionUID = 1L;

					@Override
					public Tuple2<String, Integer> map(String value) throws Exception {
						return new Tuple2<String, Integer>(value, 1);
					}
				}).iterate();

		IterativeDataStream.ConnectedIterativeDataStream<Tuple2<String, Integer>, Tuple2<String, Integer>> coIter = iteration.<Tuple2<String, Integer>>withFeedbackType("Tuple2<String, Integer>");

		SingleOutputStreamOperator<Tuple2<String, Integer>, ?> iter1Map = coIter
				.map(new CoMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
					private static final long serialVersionUID = 1L;

					@Override
					public Tuple2<String, Integer> map1(Tuple2<String, Integer> value) throws Exception {
						System.out.println("INITIAL 1" + value);
						value.f1++;
						return value;
					}

					@Override
					public Tuple2<String, Integer> map2(Tuple2<String, Integer> value) throws Exception {
						System.out.println("FEEDBACK 1"  +value);
						value.f1++;
						return value;
					}
				}).setParallelism(4);

		SingleOutputStreamOperator<Tuple2<String, Integer>, ?> iter2Map = coIter
				.map(new CoMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
					private static final long serialVersionUID = 1L;

					@Override
					public Tuple2<String, Integer> map1(Tuple2<String, Integer> value) throws Exception {
						System.out.println("INITIAL 2 " + value);
						value.f1++;
						return value;
					}

					@Override
					public Tuple2<String, Integer> map2(Tuple2<String, Integer> value) throws Exception {
						System.out.println("FEEDBACK 2 "  +value);
						value.f1++;
						return value;
					}
				}).setParallelism(4);

		SingleOutputStreamOperator<Tuple2<String, Integer>, ? extends SingleOutputStreamOperator<Tuple2<String, Integer>, ?>> map1 = iter1Map.map(
				new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
					private static final long serialVersionUID = 1L;

					@Override
					public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
						return value;
					}
				}).broadcast();

		SingleOutputStreamOperator<Tuple2<String, Integer>, ? extends SingleOutputStreamOperator<Tuple2<String, Integer>, ?>> map2 = iter2Map.map(
				new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
					private static final long serialVersionUID = 1L;

					@Override
					public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
						return value;
					}
				}).rebalance();


		SplitDataStream<Tuple2<String, Integer>> iter1Split = map1.split(new OutputSelector<Tuple2<String, Integer>>() {
			private static final long serialVersionUID = 1L;

			@Override
			public Iterable<String> select(Tuple2<String, Integer> value) {
				if (value.f1 < 10) {
					return Collections.singleton("loop");
				} else {
					return Collections.singleton("end");
				}
			}
		});

		SplitDataStream<Tuple2<String, Integer>> iter2Split = map2.split(new OutputSelector<Tuple2<String, Integer>>() {
			private static final long serialVersionUID = 1L;

			@Override
			public Iterable<String> select(Tuple2<String, Integer> value) {
				if (value.f1 < 10) {
					return Collections.singleton("loop");
				} else {
					return Collections.singleton("end");
				}
			}
		});

		coIter.closeWith(iter1Split.select("loop"));
//		coIter.closeWith(iter2Split.select("loop"));

		iter1Split.select("end").print();
		iter2Split.select("end").print();

		System.out.println(env.getExecutionPlan());
		env.execute("IterationTest");
	}

}
{code}

> Stream Iteration can Hang after some Data
> -----------------------------------------
>
>                 Key: FLINK-2470
>                 URL: https://issues.apache.org/jira/browse/FLINK-2470
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: Aljoscha Krettek
>
> I was trying out a (rather contrieved) Co-Iteration example job and at some point the elements are not emitted fed back (or emitted to the sink) anymore.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)