You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/11/07 16:18:00 UTC

[jira] [Commented] (FLINK-10809) Using DataStreamUtils.reinterpretAsKeyedStream produces corrupted keyed state after restore

    [ https://issues.apache.org/jira/browse/FLINK-10809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16678454#comment-16678454 ] 

ASF GitHub Bot commented on FLINK-10809:
----------------------------------------

StefanRRichter opened a new pull request #7048: [FLINK-10809][state] Include keyed state that is not from head operat…
URL: https://github.com/apache/flink/pull/7048
 
 
   …ors in state assignment
   
   ## What is the purpose of the change
   
   This PR includes keyed state that was not from a head operator (head of operator chain) in the state assignment. This fixes problems with restoring keyed state for operators after `DataStreamUtils.reinterpretAsKeyedStream`.
   
   
   ## Brief change log
   
   Remove a check if keyed state is from a head operator in the state assignment algorithm. This was an optimization from the times where Flink only allowed keyed state in the head operators (like what happens after every `keyBy`).
   
   
   ## Verifying this change
   
   Extended `ReinterpretDataStreamAsKeyedStreamITCase` with a recovery cycle to test proper state restore of non-head operators.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? ( no)
     - If yes, how is the feature documented? (not applicable)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Using DataStreamUtils.reinterpretAsKeyedStream produces corrupted keyed state after restore
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-10809
>                 URL: https://issues.apache.org/jira/browse/FLINK-10809
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, State Backends, Checkpointing
>    Affects Versions: 1.7.0
>            Reporter: Dawid Wysakowicz
>            Priority: Major
>              Labels: pull-request-available
>
> I've tried using {{DataStreamUtils.reinterpretAsKeyedStream}} for results of windowed aggregation:
> {code}
> 		DataStream<Tuple2<Integer, List<Event>>> eventStream4 = eventStream2.keyBy(Event::getKey)
> 			.window(SlidingEventTimeWindows.of(Time.milliseconds(150 * 3), Time.milliseconds(150)))
> 			.apply(new WindowFunction<Event, Tuple2<Integer, List<Event>>, Integer, TimeWindow>() {
> 				private static final long serialVersionUID = 3166250579972849440L;
> 				@Override
> 				public void apply(
> 					Integer key, TimeWindow window, Iterable<Event> input,
> 					Collector<Tuple2<Integer, List<Event>>> out) throws Exception {
> 					out.collect(Tuple2.of(key, StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList())));
> 				}
> 			});
> 		DataStreamUtils.reinterpretAsKeyedStream(eventStream4, events-> events.f0)
> 			.flatMap(createSlidingWindowCheckMapper(pt))
> 			.addSink(new PrintSinkFunction<>());
> {code}
> and then in the createSlidingWindowCheckMapper I verify that each event belongs to 3 consecutive windows, for which I keep contents of last window in ValueState. In a non-failure setup this check runs fine, but it misses few windows after restore at the beginning.
> {code}
> public class SlidingWindowCheckMapper extends RichFlatMapFunction<Tuple2<Integer, List<Event>>, String> {
> 	private static final long serialVersionUID = -744070793650644485L;
> 	/** This value state tracks previously seen events with the number of windows they appeared in. */
> 	private transient ValueState<List<Tuple2<Event, Integer>>> previousWindow;
> 	private final int slideFactor;
> 	SlidingWindowCheckMapper(int slideFactor) {
> 		this.slideFactor = slideFactor;
> 	}
> 	@Override
> 	public void open(Configuration parameters) throws Exception {
> 		ValueStateDescriptor<List<Tuple2<Event, Integer>>> previousWindowDescriptor =
> 			new ValueStateDescriptor<>("previousWindow",
> 				new ListTypeInfo<>(new TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO)));
> 		previousWindow = getRuntimeContext().getState(previousWindowDescriptor);
> 	}
> 	@Override
> 	public void flatMap(Tuple2<Integer, List<Event>> value, Collector<String> out) throws Exception {
> 		List<Tuple2<Event, Integer>> previousWindowValues = Optional.ofNullable(previousWindow.value()).orElseGet(
> 			Collections::emptyList);
> 		List<Event> newValues = value.f1;
> 		newValues.stream().reduce(new BinaryOperator<Event>() {
> 			@Override
> 			public Event apply(Event event, Event event2) {
> 				if (event2.getSequenceNumber() - 1 != event.getSequenceNumber()) {
> 					out.collect("Alert: events in window out ouf order!");
> 				}
> 				return event2;
> 			}
> 		});
> 		List<Tuple2<Event, Integer>> newWindow = new ArrayList<>();
> 		for (Tuple2<Event, Integer> windowValue : previousWindowValues) {
> 			if (!newValues.contains(windowValue.f0)) {
> 				out.collect(String.format("Alert: event %s did not belong to %d consecutive windows. Event seen so far %d times.Current window: %s",
> 					windowValue.f0,
> 					slideFactor,
> 					windowValue.f1,
> 					value.f1));
> 			} else {
> 				newValues.remove(windowValue.f0);
> 				if (windowValue.f1 + 1 != slideFactor) {
> 					newWindow.add(Tuple2.of(windowValue.f0, windowValue.f1 + 1));
> 				}
> 			}
> 		}
> 		newValues.forEach(e -> newWindow.add(Tuple2.of(e, 1)));
> 		previousWindow.update(newWindow);
> 	}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)