You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by bowenli86 <gi...@git.apache.org> on 2018/02/14 06:13:51 UTC

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

GitHub user bowenli86 opened a pull request:

    https://github.com/apache/flink/pull/5481

    [FLINK-8560] Access to the current key in ProcessFunction after keyBy()

    ## What is the purpose of the change
    
    Currently, it is required to store the key of a keyBy() in the processElement method to have access to it in the OnTimerContext.
    
    This is not so good as you have to check in the processElement method for every element if the key is already stored and set it if it's not already set.
    
    A possible solution would adding OnTimerContext#getCurrentKey() or a similar method. Maybe having it in the open() method could maybe work as well.
    
    ## Brief change log
    
    added `OnTimerContext#getCurrentKey()`
    
    One limitation is that this impl of `getCurrentKey()` currently is not strongly typed. Declaring the key's type requires adding a new generic type to `ProcessFunction` - making the declaration from `ProcessFunction<IN, OUT>` to `ProcessFunction<IN, K, OUT>`. I'm worried it may break user's application, so I decide to make `getCurrentKey()` return an object. I'd like to discuss the feasibility of having strong type.
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as *KeyedProcessOperatorTest*.
    
    ## Does this pull request potentially affect one of the following parts:
    
    none
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (docs / JavaDocs)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bowenli86/flink FLINK-8560

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5481.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5481
    
----
commit c5b8a4f27094b88c8641e2bdd30ea0ca65a7a4be
Author: Bowen Li <bo...@...>
Date:   2018-02-13T06:33:06Z

    [FLINK-8560] Access to the current key in ProcessFunction after keyBy()

----


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r171234411
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---
    @@ -321,19 +324,80 @@ private boolean validateKeyTypeIsHashable(TypeInformation<?> type) {
     	 * @param <R> The type of elements emitted by the {@code ProcessFunction}.
     	 *
     	 * @return The transformed {@link DataStream}.
    +	 *
    +	 * @deprecated Use {@link KeyedStream#process(KeyedProcessFunction, TypeInformation)}
     	 */
    +	@Deprecated
     	@Override
     	@Internal
     	public <R> SingleOutputStreamOperator<R> process(
     			ProcessFunction<T, R> processFunction,
     			TypeInformation<R> outputType) {
     
    -		KeyedProcessOperator<KEY, T, R> operator =
    -				new KeyedProcessOperator<>(clean(processFunction));
    +		LegacyKeyedProcessOperator<K, T, R> operator = new LegacyKeyedProcessOperator<>(clean(processFunction));
     
     		return transform("Process", outputType, operator);
     	}
     
    +	/**
    +	 * Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream.
    +	 *
    +	 * <p>The function will be called for every element in the input streams and can produce zero
    +	 * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
    +	 * function, this function can also query the time and set timers. When reacting to the firing
    +	 * of set timers the function can directly emit elements and/or register yet more timers.
    +	 *
    +	 * @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream.
    +	 *
    +	 * @param <K> The type of key in {@code KeyedProcessFunction}.
    +	 *
    +	 * @param <R> The type of elements emitted by the {@code KeyedProcessFunction}.
    +	 *
    +	 * @return The transformed {@link DataStream}.
    +	 */
    +	@PublicEvolving
    +	public <K, R> SingleOutputStreamOperator<R> process(KeyedProcessFunction<K, T, R> keyedProcessFunction) {
    +
    +		TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
    +				keyedProcessFunction,
    +				KeyedProcessFunction.class,
    +				0,
    --- End diff --
    
    The indices here are not `0` and `1` for input and output type, but `1` and `2`. In the process function it was 0 and 1 because we did not have the key.


---

[GitHub] flink issue #5481: [FLINK-8560] Access to the current key in ProcessFunction...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/5481
  
    Thanks @bowenli86 ! I will merge as soon as Travis gives green.


---

[GitHub] flink issue #5481: [FLINK-8560] Access to the current key in ProcessFunction...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5481
  
    I think you can rename the existing one to `LegacyKeyedProcessOperator` or something like this and have a comment that describes the situation.


---

[GitHub] flink issue #5481: [FLINK-8560] Access to the current key in ProcessFunction...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/5481
  
    @kl0u @aljoscha   I added the scala example, and I believe the only build failure in Travis is irrelevant


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r172135104
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java ---
    @@ -70,21 +69,15 @@ public void open() throws Exception {
     	@Override
     	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
     		collector.setAbsoluteTimestamp(timer.getTimestamp());
    -		onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
    -		onTimerContext.timer = timer;
    -		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
    -		onTimerContext.timeDomain = null;
    -		onTimerContext.timer = null;
    +		reinitialize(userFunction, TimeDomain.EVENT_TIME, timer);
    --- End diff --
    
    Hate to be picky, but I think the name is a bit misleading and we could probably put all of this in a method `invokeUserTime()` that does what `reinitialise()` and `reset()` do.
    
    @kl0u I think you can quickly fix that when merging.


---

[GitHub] flink issue #5481: [FLINK-8560] Access to the current key in ProcessFunction...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/5481
  
    cc @pnowojski  @aljoscha 


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r169571398
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala ---
    @@ -79,7 +78,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
     
         asScalaStream(javaStream.process(processFunction, implicitly[TypeInformation[R]]))
       }
    -  
    +
    +  /**
    +    * Applies the given [[KeyedProcessFunction]] on the input stream, thereby
    --- End diff --
    
    ditto


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r169060097
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java ---
    @@ -377,119 +375,114 @@ public T getKey(T value) throws Exception {
     		}
     	}
     
    -	private static class QueryingFlatMapFunction extends ProcessFunction<Integer, String> {
    +	private static class QueryingFlatMapFunction extends KeyedProcessFunction<Integer, Integer, String> {
     
     		private static final long serialVersionUID = 1L;
     
    -		private final TimeDomain timeDomain;
    +		private final TimeDomain expectedTimeDomain;
     
     		public QueryingFlatMapFunction(TimeDomain timeDomain) {
    -			this.timeDomain = timeDomain;
    +			this.expectedTimeDomain = timeDomain;
     		}
     
     		@Override
     		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
    -			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
    +			if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {
     				out.collect(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
     			} else {
     				out.collect(value + "TIME:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
     			}
     		}
     
     		@Override
    -		public void onTimer(
    -				long timestamp,
    -				OnTimerContext ctx,
    -				Collector<String> out) throws Exception {
    +		public void onTimer(long timestamp, OnTimerContext<Integer> ctx, Collector<String> out) throws Exception {
    +			// Do nothing
     		}
     	}
     
    -	private static class TriggeringFlatMapFunction extends ProcessFunction<Integer, Integer> {
    +	private static class TriggeringFlatMapFunction extends KeyedProcessFunction<Integer, Integer, Integer> {
     
     		private static final long serialVersionUID = 1L;
     
    -		private final TimeDomain timeDomain;
    +		private final TimeDomain expectedTimeDomain;
    +
    +		static final Integer EXPECTED_KEY = 17;
    --- End diff --
    
    As in the second PR: does this have to be a static field? Or can we initialise it in the constructor? I think it should work with the `expectedKey` set in the constructor as long as this is not an `ITCase` - and it's not, it's using test harness. 


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r168121129
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java ---
    @@ -397,17 +396,16 @@ public void processElement(Integer value, Context ctx, Collector<String> out) th
     		}
     
     		@Override
    -		public void onTimer(
    -				long timestamp,
    -				OnTimerContext ctx,
    -				Collector<String> out) throws Exception {
    +		public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
     		}
     	}
     
     	private static class TriggeringFlatMapFunction extends ProcessFunction<Integer, Integer> {
     
     		private static final long serialVersionUID = 1L;
     
    +		static final int TEST_VALUE = 17;
    +
     		private final TimeDomain timeDomain;
     
     		public TriggeringFlatMapFunction(TimeDomain timeDomain) {
    --- End diff --
    
    rename `timeDomain` -> `expectedTimeDomain` and add `expectedKey` and add assertion for the expected key to `onTimer()` method triggered both in `KeyedProcessOperatorTest#testEventTimeTimers` and `KeyedProcessOperatorTest#testProcessingTimeTimers`


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r171235118
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.functions.AbstractRichFunction;
    +import org.apache.flink.streaming.api.TimeDomain;
    +import org.apache.flink.streaming.api.TimerService;
    +import org.apache.flink.util.Collector;
    +import org.apache.flink.util.OutputTag;
    +
    +/**
    + * A keyed function that processes elements of a stream.
    + *
    + * <p>For every element in the input stream {@link #processElement(Object, Context, Collector)}
    + * is invoked. This can produce zero or more elements as output. Implementations can also
    + * query the time and set timers through the provided {@link Context}. For firing timers
    + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce
    + * zero or more elements as output and register further timers.
    + *
    + * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only
    + * available if the {@code KeyedProcessFunction} is applied on a {@code KeyedStream}.
    + *
    + * <p><b>NOTE:</b> A {@code KeyedProcessFunction} is always a
    + * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the
    + * {@link org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and
    + * teardown methods can be implemented. See
    + * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
    + * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
    + *
    + * @param <K> Type of the key.
    + * @param <I> Type of the input elements.
    + * @param <O> Type of the output elements.
    + */
    +@PublicEvolving
    +public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	/**
    +	 * Process one element from the input stream.
    +	 *
    +	 * <p>This function can output zero or more elements using the {@link Collector} parameter
    +	 * and also update internal state or set timers using the {@link Context} parameter.
    +	 *
    +	 * @param value The input value.
    +	 * @param ctx A {@link Context} that allows querying the timestamp of the element and getting
    +	 *            a {@link TimerService} for registering timers and querying the time. The
    +	 *            context is only valid during the invocation of this method, do not store it.
    +	 * @param out The collector for returning result values.
    +	 *
    +	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
    +	 *                   to fail and may trigger recovery.
    +	 */
    +	public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
    +
    +	/**
    +	 * Called when a timer set using {@link TimerService} fires.
    +	 *
    +	 * @param timestamp The timestamp of the firing timer.
    +	 * @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link TimeDomain}, and the key
    +	 *            of the firing timer and getting a {@link TimerService} for registering timers and querying the time.
    +	 *            The context is only valid during the invocation of this method, do not store it.
    +	 * @param out The collector for returning result values.
    +	 *
    +	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
    +	 *                   to fail and may trigger recovery.
    +	 */
    +	public void onTimer(long timestamp, OnTimerContext<K> ctx, Collector<O> out) throws Exception {}
    +
    +	/**
    +	 * Information available in an invocation of {@link #processElement(Object, Context, Collector)}
    +	 * or {@link #onTimer(long, OnTimerContext, Collector)}.
    +	 */
    +	public abstract class Context {
    +
    +		/**
    +		 * Timestamp of the element currently being processed or timestamp of a firing timer.
    +		 *
    +		 * <p>This might be {@code null}, for example if the time characteristic of your program
    +		 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
    +		 */
    +		public abstract Long timestamp();
    +
    +		/**
    +		 * A {@link TimerService} for querying time and registering timers.
    +		 */
    +		public abstract TimerService timerService();
    +
    +		/**
    +		 * Emits a record to the side output identified by the {@link OutputTag}.
    +		 *
    +		 * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
    +		 * @param value The record to emit.
    +		 */
    +		public abstract <X> void output(OutputTag<X> outputTag, X value);
    +	}
    +
    +	/**
    +	 * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
    +	 */
    +	public abstract class OnTimerContext<K> extends Context {
    --- End diff --
    
    Here you do not have to specify the `<K>` as this is still visible from the containing class.


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r171233273
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---
    @@ -84,18 +86,18 @@
      * elements that have the same key.
      *
      * @param <T> The type of the elements in the Keyed Stream.
    - * @param <KEY> The type of the key in the Keyed Stream.
    + * @param <K> The type of the key in the Keyed Stream.
    --- End diff --
    
    Revert the renaming from `KEY` to `K`.


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r169060125
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java ---
    @@ -377,119 +375,114 @@ public T getKey(T value) throws Exception {
     		}
     	}
     
    -	private static class QueryingFlatMapFunction extends ProcessFunction<Integer, String> {
    +	private static class QueryingFlatMapFunction extends KeyedProcessFunction<Integer, Integer, String> {
     
     		private static final long serialVersionUID = 1L;
     
    -		private final TimeDomain timeDomain;
    +		private final TimeDomain expectedTimeDomain;
     
     		public QueryingFlatMapFunction(TimeDomain timeDomain) {
    -			this.timeDomain = timeDomain;
    +			this.expectedTimeDomain = timeDomain;
     		}
     
     		@Override
     		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
    -			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
    +			if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {
     				out.collect(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
     			} else {
     				out.collect(value + "TIME:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
     			}
     		}
     
     		@Override
    -		public void onTimer(
    -				long timestamp,
    -				OnTimerContext ctx,
    -				Collector<String> out) throws Exception {
    +		public void onTimer(long timestamp, OnTimerContext<Integer> ctx, Collector<String> out) throws Exception {
    +			// Do nothing
     		}
     	}
     
    -	private static class TriggeringFlatMapFunction extends ProcessFunction<Integer, Integer> {
    +	private static class TriggeringFlatMapFunction extends KeyedProcessFunction<Integer, Integer, Integer> {
     
     		private static final long serialVersionUID = 1L;
     
    -		private final TimeDomain timeDomain;
    +		private final TimeDomain expectedTimeDomain;
    +
    +		static final Integer EXPECTED_KEY = 17;
     
     		public TriggeringFlatMapFunction(TimeDomain timeDomain) {
    -			this.timeDomain = timeDomain;
    +			this.expectedTimeDomain = timeDomain;
     		}
     
     		@Override
     		public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
     			out.collect(value);
    -			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
    +			if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {
     				ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
     			} else {
     				ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
     			}
     		}
     
     		@Override
    -		public void onTimer(
    -				long timestamp,
    -				OnTimerContext ctx,
    -				Collector<Integer> out) throws Exception {
    -
    -			assertEquals(this.timeDomain, ctx.timeDomain());
    +		public void onTimer(long timestamp, OnTimerContext<Integer> ctx, Collector<Integer> out) throws Exception {
    +			assertEquals(EXPECTED_KEY, ctx.getCurrentKey());
    +			assertEquals(expectedTimeDomain, ctx.timeDomain());
     			out.collect(1777);
     		}
     	}
     
    -	private static class TriggeringStatefulFlatMapFunction extends ProcessFunction<Integer, String> {
    +	private static class TriggeringStatefulFlatMapFunction extends KeyedProcessFunction<Integer, Integer, String> {
     
     		private static final long serialVersionUID = 1L;
     
     		private final ValueStateDescriptor<Integer> state =
     				new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE);
     
    -		private final TimeDomain timeDomain;
    +		private final TimeDomain expectedTimeDomain;
     
     		public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) {
    -			this.timeDomain = timeDomain;
    +			this.expectedTimeDomain = timeDomain;
     		}
     
     		@Override
     		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
     			out.collect("INPUT:" + value);
     			getRuntimeContext().getState(state).update(value);
    -			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
    +			if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {
     				ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
     			} else {
     				ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
     			}
     		}
     
     		@Override
    -		public void onTimer(
    -				long timestamp,
    -				OnTimerContext ctx,
    -				Collector<String> out) throws Exception {
    -			assertEquals(this.timeDomain, ctx.timeDomain());
    +		public void onTimer(long timestamp, OnTimerContext<Integer> ctx, Collector<String> out) throws Exception {
    +			System.out.println(ctx.getCurrentKey());
    +			assertEquals(expectedTimeDomain, ctx.timeDomain());
     			out.collect("STATE:" + getRuntimeContext().getState(state).value());
     		}
     	}
     
    -	private static class BothTriggeringFlatMapFunction extends ProcessFunction<Integer, String> {
    +	private static class BothTriggeringFlatMapFunction extends KeyedProcessFunction<Integer, Integer, String> {
     
     		private static final long serialVersionUID = 1L;
     
    +		static final Integer EXPECTED_KEY = 5;
    --- End diff --
    
    ditto


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r171236298
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala ---
    @@ -54,21 +54,20 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
       // ------------------------------------------------------------------------
     
       /**
    -    * Applies the given [[ProcessFunction]] on the input stream, thereby
    -    * creating a transformed output stream.
    -    *
    -    * The function will be called for every element in the stream and can produce
    -    * zero or more output. The function can also query the time and set timers. When
    -    * reacting to the firing of set timers the function can emit yet more elements.
    -    *
    -    * The function will be called for every element in the input streams and can produce zero
    -    * or more output elements. Contrary to the [[DataStream#flatMap(FlatMapFunction)]]
    -    * function, this function can also query the time and set timers. When reacting to the firing
    -    * of set timers the function can directly emit elements and/or register yet more timers.
    -    *
    -    * @param processFunction The [[ProcessFunction]] that is called for each element
    -    *                   in the stream.
    -    */
    +   * Applies the given [[ProcessFunction]] on the input stream, thereby
    +   * creating a transformed output stream.
    +   *
    +   * The function will be called for every element in the stream and can produce
    +   * zero or more output. The function can also query the time and set timers. When
    +   * reacting to the firing of set timers the function can emit yet more elements.
    +   *
    +   * The function will be called for every element in the input streams and can produce zero
    +   * or more output elements. Contrary to the [[DataStream#flatMap(FlatMapFunction)]]
    +   * function, this function can also query the time and set timers. When reacting to the firing
    +   * of set timers the function can directly emit elements and/or register yet more timers.
    +   *
    +   * @param processFunction The [[ProcessFunction]] that is called for each element in the stream.
    +   */
    --- End diff --
    
    Revert all reformatings (indent by 1 space) and also add the `@deprecated` annotation with the correct, non-deprecated alternative, as done in the corresponding `java` class.


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r169571457
  
    --- Diff: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala ---
    @@ -473,6 +496,28 @@ class DataStreamTest extends AbstractTestBase {
         assert(getOperatorForDataStream(flatMapped).isInstanceOf[ProcessOperator[_, _]])
       }
     
    +  /**
    +    * Verify that a [[DataStream.process(KeyedProcessFunction)]] call is correctly
    --- End diff --
    
    ditto


---

[GitHub] flink issue #5481: [FLINK-8560] Access to the current key in ProcessFunction...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5481
  
    @bowenli86 You can take a look at `KeyedBroadcastProcessFunction`. I think we could also add a `KeyedProcessFunction`, that has the key type in the signature. This would allow exposing the key in `onTimer()` and other methods.


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by juergenthomann <gi...@git.apache.org>.
Github user juergenthomann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r169077241
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---
    @@ -654,6 +648,63 @@ public ExecutionConfig getExecutionConfig() {
     		return transform("Process", outputType, operator);
     	}
     
    +	/**
    +	 * Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream.
    +	 *
    +	 * <p>The function will be called for every element in the input streams and can produce zero
    +	 * or more output elements.
    +	 *
    +	 * @param keyedProcessFunction The {@link ProcessFunction} that is called for each element in the stream.
    +	 *
    +	 * @param <K> The type of key in {@code KeyedProcessFunction}.
    +	 *
    +	 * @param <R> The type of elements emitted by the {@code PKeyedProcessFunction}.
    --- End diff --
    
    Could this be a typo with **P**KeyedProcessFunction instead of KeyedProcessFunction?


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r169571436
  
    --- Diff: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala ---
    @@ -448,11 +447,35 @@ class DataStreamTest extends AbstractTestBase {
         val flatMapped = src.keyBy(x => x).process(processFunction)
     
         assert(processFunction == getFunctionForDataStream(flatMapped))
    +    assert(getOperatorForDataStream(flatMapped).isInstanceOf[LegacyKeyedProcessOperator[_, _, _]])
    +  }
    +
    +  /**
    +    * Verify that a [[KeyedStream.process(KeyedProcessFunction)]] call is correctly
    --- End diff --
    
    ditto


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r172136142
  
    --- Diff: docs/dev/stream/operators/process_function.md ---
    @@ -242,4 +242,17 @@ class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String
     the current processing time as event-time timestamp. This behavior is very subtle and might not be noticed by users. Well, it's
     harmful because processing-time timestamps are indeterministic and not aligned with watermarks. Besides, user-implemented logic
     depends on this wrong timestamp highly likely is unintendedly faulty. So we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs
    -that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic.
    \ No newline at end of file
    +that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic.
    +
    +## The KeyedProcessFunction
    +
    +`KeyedProcessFunction`, as an extension of `ProcessFunction`, gives access to the key of timers in its `onTimer(...)`
    +method.
    +
    +{% highlight java %}
    --- End diff --
    
    @bowenli86 As soon as the scala example is added, I can take care of the other two comments and merge! Let me know when you update the PR, and thanks for the work!


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r171233924
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---
    @@ -321,19 +324,80 @@ private boolean validateKeyTypeIsHashable(TypeInformation<?> type) {
     	 * @param <R> The type of elements emitted by the {@code ProcessFunction}.
     	 *
     	 * @return The transformed {@link DataStream}.
    +	 *
    +	 * @deprecated Use {@link KeyedStream#process(KeyedProcessFunction, TypeInformation)}
     	 */
    +	@Deprecated
     	@Override
     	@Internal
     	public <R> SingleOutputStreamOperator<R> process(
     			ProcessFunction<T, R> processFunction,
     			TypeInformation<R> outputType) {
     
    -		KeyedProcessOperator<KEY, T, R> operator =
    -				new KeyedProcessOperator<>(clean(processFunction));
    +		LegacyKeyedProcessOperator<K, T, R> operator = new LegacyKeyedProcessOperator<>(clean(processFunction));
     
     		return transform("Process", outputType, operator);
     	}
     
    +	/**
    +	 * Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream.
    +	 *
    +	 * <p>The function will be called for every element in the input streams and can produce zero
    +	 * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
    +	 * function, this function can also query the time and set timers. When reacting to the firing
    +	 * of set timers the function can directly emit elements and/or register yet more timers.
    +	 *
    +	 * @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream.
    +	 *
    +	 * @param <K> The type of key in {@code KeyedProcessFunction}.
    +	 *
    +	 * @param <R> The type of elements emitted by the {@code KeyedProcessFunction}.
    +	 *
    +	 * @return The transformed {@link DataStream}.
    +	 */
    +	@PublicEvolving
    +	public <K, R> SingleOutputStreamOperator<R> process(KeyedProcessFunction<K, T, R> keyedProcessFunction) {
    +
    --- End diff --
    
    The signature should be: 
    ```public <R> SingleOutputStreamOperator<R> process(KeyedProcessFunction<KEY, T, R> keyedProcessFunction) {```
    
    You do not have to re-define the type of the key (the `<K, R>` in the beginning) as we are already in a `KeyedStream` with an already defined type of key.
    
    Also remove the corresponding part in the `javadoc`.


---

[GitHub] flink issue #5481: [FLINK-8560] Access to the current key in ProcessFunction...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/5481
  
    Thanks for the review and suggestions. And your comment on `DataStream#process(KeyedProcessFunction)` makes sense, I've removed it. 
    
    (btw, I feel https://github.com/apache/flink/pull/5500 is more urgent that this PR. Can you take it look at that one?)


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5481


---

[GitHub] flink issue #5481: [FLINK-8560] Access to the current key in ProcessFunction...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/5481
  
    @kl0u I added the comments for `@deprecated` in the javadoc. Let me know if you can merge the two related PRs. Thanks


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r171235356
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.functions.AbstractRichFunction;
    +import org.apache.flink.streaming.api.TimeDomain;
    +import org.apache.flink.streaming.api.TimerService;
    +import org.apache.flink.util.Collector;
    +import org.apache.flink.util.OutputTag;
    +
    +/**
    + * A keyed function that processes elements of a stream.
    + *
    + * <p>For every element in the input stream {@link #processElement(Object, Context, Collector)}
    + * is invoked. This can produce zero or more elements as output. Implementations can also
    + * query the time and set timers through the provided {@link Context}. For firing timers
    + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce
    + * zero or more elements as output and register further timers.
    + *
    + * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only
    + * available if the {@code KeyedProcessFunction} is applied on a {@code KeyedStream}.
    + *
    + * <p><b>NOTE:</b> A {@code KeyedProcessFunction} is always a
    + * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the
    + * {@link org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and
    + * teardown methods can be implemented. See
    + * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
    + * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
    + *
    + * @param <K> Type of the key.
    + * @param <I> Type of the input elements.
    + * @param <O> Type of the output elements.
    + */
    +@PublicEvolving
    +public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	/**
    +	 * Process one element from the input stream.
    +	 *
    +	 * <p>This function can output zero or more elements using the {@link Collector} parameter
    +	 * and also update internal state or set timers using the {@link Context} parameter.
    +	 *
    +	 * @param value The input value.
    +	 * @param ctx A {@link Context} that allows querying the timestamp of the element and getting
    +	 *            a {@link TimerService} for registering timers and querying the time. The
    +	 *            context is only valid during the invocation of this method, do not store it.
    +	 * @param out The collector for returning result values.
    +	 *
    +	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
    +	 *                   to fail and may trigger recovery.
    +	 */
    +	public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
    +
    +	/**
    +	 * Called when a timer set using {@link TimerService} fires.
    +	 *
    +	 * @param timestamp The timestamp of the firing timer.
    +	 * @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link TimeDomain}, and the key
    +	 *            of the firing timer and getting a {@link TimerService} for registering timers and querying the time.
    +	 *            The context is only valid during the invocation of this method, do not store it.
    +	 * @param out The collector for returning result values.
    +	 *
    +	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
    +	 *                   to fail and may trigger recovery.
    +	 */
    +	public void onTimer(long timestamp, OnTimerContext<K> ctx, Collector<O> out) throws Exception {}
    +
    --- End diff --
    
    Remove the `<K>` (see comment below).


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r171235653
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java ---
    @@ -132,15 +139,15 @@ public TimerService timerService() {
     		}
     	}
     
    -	private class OnTimerContextImpl extends ProcessFunction<IN, OUT>.OnTimerContext{
    +	private class OnTimerContextImpl extends KeyedProcessFunction<K, IN, OUT>.OnTimerContext<K> {
     
    --- End diff --
    
    With the proposed changes you can also remove the `<K>` in the `.OnTimerContext<K>`.


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r169571208
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---
    @@ -666,18 +666,18 @@ class DataStream[T](stream: JavaStream[T]) {
       }
     
       /**
    -   * Applies the given [[ProcessFunction]] on the input stream, thereby
    -   * creating a transformed output stream.
    -   *
    -   * The function will be called for every element in the stream and can produce
    -   * zero or more output.
    -   *
    -   * @param processFunction The [[ProcessFunction]] that is called for each element
    -   *                   in the stream.
    -   */
    +    * Applies the given [[ProcessFunction]] on the input stream, thereby
    --- End diff --
    
    Can you revert this formatting? I think proper java docs should be as this was before:
    ```
    /**
     *
     */
    ```
    instead of:
    ```
    /**
      *
      */
    ```


---

[GitHub] flink issue #5481: [FLINK-8560] Access to the current key in ProcessFunction...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/5481
  
    @aljoscha @pnowojski  Guys, quick question. I'm about to develop `KeyedProcessFunction` and its operator in a keyed stream. But I found there's already a `KeyedProcessOperator` which is for `ProcessFunction` in a keyed stream. Shall I create a new operator named something like `KeyedProcessFunctionOperator`?
    
    Thanks,
    Bowen


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r168265503
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala ---
    @@ -162,7 +162,7 @@ class GroupAggProcessFunction(
     
       override def onTimer(
           timestamp: Long,
    -      ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
    +      ctx: ProcessFunction[CRow, CRow]#OnTimerContext[_],
    --- End diff --
    
    yes, I think I'll create a `KeyedProcessFunction` as Aljoscha suggested, and we probably don't need these scala changes any more.


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r172135390
  
    --- Diff: docs/dev/stream/operators/process_function.md ---
    @@ -242,4 +242,17 @@ class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String
     the current processing time as event-time timestamp. This behavior is very subtle and might not be noticed by users. Well, it's
     harmful because processing-time timestamps are indeterministic and not aligned with watermarks. Besides, user-implemented logic
     depends on this wrong timestamp highly likely is unintendedly faulty. So we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs
    -that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic.
    \ No newline at end of file
    +that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic.
    +
    +## The KeyedProcessFunction
    +
    +`KeyedProcessFunction`, as an extension of `ProcessFunction`, gives access to the key of timers in its `onTimer(...)`
    +method.
    +
    +{% highlight java %}
    --- End diff --
    
    Maybe also add Scala example code.


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r172133573
  
    --- Diff: docs/dev/stream/operators/process_function.md ---
    @@ -242,4 +242,17 @@ class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String
     the current processing time as event-time timestamp. This behavior is very subtle and might not be noticed by users. Well, it's
     harmful because processing-time timestamps are indeterministic and not aligned with watermarks. Besides, user-implemented logic
     depends on this wrong timestamp highly likely is unintendedly faulty. So we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs
    -that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic.
    \ No newline at end of file
    +that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic.
    +
    +## The KeyedProcessFunction
    +
    +`KeyedProcessFunction`, as an extension of `ProcessFunction`, gives access to the key of timers in its `onTimer(...)`
    +method.
    +
    +{% highlight java %}
    +@Override
    +public void onTimer(long timestamp, OnTimerContext<K> ctx, Collector<OUT> out) throws Exception {
    --- End diff --
    
    I believe this is now `public void onTimer(long timestamp, OnTimerContext<K> ctx, Collector<OUT> out)`, right? @kl0u you could fix this while merging.


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r171237037
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala ---
    @@ -79,7 +78,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
     
         asScalaStream(javaStream.process(processFunction, implicitly[TypeInformation[R]]))
       }
    -  
    +
    +  /**
    +   * Applies the given [[KeyedProcessFunction]] on the input stream, thereby
    +   * creating a transformed output stream.
    +   *
    +   * The function will be called for every element in the stream and can produce
    +   * zero or more output. The function can also query the time and set timers. When
    +   * reacting to the firing of set timers the function can emit yet more elements.
    +   *
    +   * The function will be called for every element in the input streams and can produce zero
    +   * or more output elements. Contrary to the [[DataStream#flatMap(FlatMapFunction)]]
    +   * function, this function can also query the time and set timers. When reacting to the firing
    +   * of set timers the function can directly emit elements and/or register yet more timers.
    +   *
    +   * @param keyedProcessFunction The [[KeyedProcessFunction]] that is called for each element
    +   *                             in the stream.
    +   */
    +  @PublicEvolving
    +  def process[K, R: TypeInformation](
    --- End diff --
    
    As in java, you do not need to redefine the `K` here. So you can remove it `def process[R: TypeInformation](`...


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r169573342
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---
    @@ -654,6 +648,63 @@ public ExecutionConfig getExecutionConfig() {
     		return transform("Process", outputType, operator);
     	}
     
    +	/**
    +	 * Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream.
    +	 *
    +	 * <p>The function will be called for every element in the input streams and can produce zero
    +	 * or more output elements.
    +	 *
    +	 * @param keyedProcessFunction The {@link ProcessFunction} that is called for each element in the stream.
    +	 *
    +	 * @param <K> The type of key in {@code KeyedProcessFunction}.
    +	 *
    +	 * @param <R> The type of elements emitted by the {@code KeyedProcessFunction}.
    +	 *
    +	 * @return The transformed {@link DataStream}.
    +	 */
    +	@PublicEvolving
    +	public <K, R> SingleOutputStreamOperator<R> process(KeyedProcessFunction<K, T, R> keyedProcessFunction) {
    --- End diff --
    
    Does it make sense to add `process` with a `KeyedProcessFunction` on non keyed `DataStream`?


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r169571197
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---
    @@ -686,6 +686,27 @@ class DataStream[T](stream: JavaStream[T]) {
         asScalaStream(javaStream.process(processFunction, implicitly[TypeInformation[R]]))
       }
     
    +  /**
    +    * Applies the given [[KeyedProcessFunction]] on the input stream, thereby
    --- End diff --
    
    ditto


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r171234582
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---
    @@ -321,19 +324,80 @@ private boolean validateKeyTypeIsHashable(TypeInformation<?> type) {
     	 * @param <R> The type of elements emitted by the {@code ProcessFunction}.
     	 *
     	 * @return The transformed {@link DataStream}.
    +	 *
    +	 * @deprecated Use {@link KeyedStream#process(KeyedProcessFunction, TypeInformation)}
     	 */
    +	@Deprecated
     	@Override
     	@Internal
     	public <R> SingleOutputStreamOperator<R> process(
     			ProcessFunction<T, R> processFunction,
     			TypeInformation<R> outputType) {
     
    -		KeyedProcessOperator<KEY, T, R> operator =
    -				new KeyedProcessOperator<>(clean(processFunction));
    +		LegacyKeyedProcessOperator<K, T, R> operator = new LegacyKeyedProcessOperator<>(clean(processFunction));
     
     		return transform("Process", outputType, operator);
     	}
     
    +	/**
    +	 * Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream.
    +	 *
    +	 * <p>The function will be called for every element in the input streams and can produce zero
    +	 * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
    +	 * function, this function can also query the time and set timers. When reacting to the firing
    +	 * of set timers the function can directly emit elements and/or register yet more timers.
    +	 *
    +	 * @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream.
    +	 *
    +	 * @param <K> The type of key in {@code KeyedProcessFunction}.
    +	 *
    +	 * @param <R> The type of elements emitted by the {@code KeyedProcessFunction}.
    +	 *
    +	 * @return The transformed {@link DataStream}.
    +	 */
    +	@PublicEvolving
    +	public <K, R> SingleOutputStreamOperator<R> process(KeyedProcessFunction<K, T, R> keyedProcessFunction) {
    +
    +		TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
    +				keyedProcessFunction,
    +				KeyedProcessFunction.class,
    +				0,
    +				1,
    +				TypeExtractor.NO_INDEX,
    +				TypeExtractor.NO_INDEX,
    +				getType(),
    +				Utils.getCallLocationName(),
    +				true);
    +
    +		return process(keyedProcessFunction, outType);
    +	}
    +
    +	/**
    +	 * Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream.
    +	 *
    +	 * <p>The function will be called for every element in the input streams and can produce zero
    +	 * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
    +	 * function, this function can also query the time and set timers. When reacting to the firing
    +	 * of set timers the function can directly emit elements and/or register yet more timers.
    +	 *
    +	 * @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream.
    +	 *
    +	 * @param outputType {@link TypeInformation} for the result type of the function.
    +	 *
    +	 * @param <K> The type of key in {@code KeyedProcessFunction}.
    +	 *
    +	 * @param <R> The type of elements emitted by the {@code KeyedProcessFunction}.
    +	 *
    +	 * @return The transformed {@link DataStream}.
    +	 */
    +	@Internal
    +	public <K, R> SingleOutputStreamOperator<R> process(
    +			KeyedProcessFunction<K, T, R> keyedProcessFunction,
    --- End diff --
    
    Same here, you do not have to redefine the type of the key.


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r171566773
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala ---
    @@ -66,9 +66,9 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
         * function, this function can also query the time and set timers. When reacting to the firing
         * of set timers the function can directly emit elements and/or register yet more timers.
         *
    -    * @param processFunction The [[ProcessFunction]] that is called for each element
    -    *                   in the stream.
    +    * @param processFunction The [[ProcessFunction]] that is called for each element in the stream.
    --- End diff --
    
    Please also add that the user now should use the new `KeyedProcessFunction` instead.


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r168122242
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala ---
    @@ -162,7 +162,7 @@ class GroupAggProcessFunction(
     
       override def onTimer(
           timestamp: Long,
    -      ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
    +      ctx: ProcessFunction[CRow, CRow]#OnTimerContext[_],
    --- End diff --
    
    I'm not a scala expert, but is this change somehow related to adding the `getCurrentKey()` method? 


---

[GitHub] flink issue #5481: [FLINK-8560] Access to the current key in ProcessFunction...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/5481
  
    cc @pnowojski 


---

[GitHub] flink issue #5481: [FLINK-8560] Access to the current key in ProcessFunction...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/5481
  
    @aljoscha Great idea! I think that's the way to go! Thanks for bringing it up!


---

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r171237136
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala ---
    @@ -79,7 +78,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
     
         asScalaStream(javaStream.process(processFunction, implicitly[TypeInformation[R]]))
       }
    -  
    +
    +  /**
    +   * Applies the given [[KeyedProcessFunction]] on the input stream, thereby
    +   * creating a transformed output stream.
    +   *
    +   * The function will be called for every element in the stream and can produce
    +   * zero or more output. The function can also query the time and set timers. When
    +   * reacting to the firing of set timers the function can emit yet more elements.
    +   *
    +   * The function will be called for every element in the input streams and can produce zero
    +   * or more output elements. Contrary to the [[DataStream#flatMap(FlatMapFunction)]]
    +   * function, this function can also query the time and set timers. When reacting to the firing
    +   * of set timers the function can directly emit elements and/or register yet more timers.
    +   *
    +   * @param keyedProcessFunction The [[KeyedProcessFunction]] that is called for each element
    +   *                             in the stream.
    +   */
    +  @PublicEvolving
    +  def process[K, R: TypeInformation](
    +    keyedProcessFunction: KeyedProcessFunction[K, T, R]): DataStream[R] = {
    +
    +    if (keyedProcessFunction == null) {
    +      throw new NullPointerException("ProcessFunction must not be null.")
    +    }
    --- End diff --
    
    The message now should be `"KeyedProcessFunction must not be null."`


---