You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by aljoscha <gi...@git.apache.org> on 2015/10/02 16:53:35 UTC

[GitHub] flink pull request: Stream API Refactoring

GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/1215

    Stream API Refactoring

    This is a WIP of the refactoring. I still want to add Javadocs and a new join operator based on tagged union that uses the new windowing operators.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink api-rework

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1215.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1215
    
----
commit 560090348f8df93160501a55051a913625c7215e
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2015-09-30T13:05:13Z

    [FLINK-2666] Add timestamp extraction operator
    
    This adds a user function TimestampExtractor and an operator
    ExtractTimestampsOperator that can be used to extract timestamps and
    attach them to elements to do event-time windowing.
    
    Users can either use an AscendingTimestampExtractor that assumes that
    timestamps are monotonically increasing. (This allows it to derive the
    watermark very easily.) Or they use a TimestampExtractor, where they
    also have to provide the watermark.
    
    The ExtractTimestampOperator periodically (on the auto watermark
    interval) calls the extractor to get the current watermark and forwards
    it.
    
    This also adds an ITCase for this behaviour.

commit 5b843231fdc5dee8d4ceada02f3ff8c41daa0281
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2015-10-01T13:58:52Z

    Simplify Stream Java API Class Names
    
    KeyedDataStream -> KeyedStream
    KeyedWindowDataStream -> WindowedStream
    NonParallelWindowDataStream -> AllWindowedStream
    
    KeyedWindowFunction -> WindowFunction
    WindowFunction -> AllWindowFunction
    (along with rich functions and reduce function wrappers)
    
    WindowedStream.mapWindow -> WindowedStream.apply
    AllWindowedStream.mapWindow -> AllWindowedStream.apply
    
    Also renamed the tests to match the new names.

commit 187ed701f548666a5daaf244ee69d43032c39c6f
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2015-10-01T15:07:11Z

    Rename ConnectedDataStream to ConnectedStreams, Remove some operations
    
    The removed operations are tricky and some of them are not working
    correctly. For now, co-reduce, stream-cross and stream-join are
    removed.
    
    I'm planning to add a new join implementation based on tagged union
    that uses the new windowing code.

commit 7892f321d0b0900a4331c7ee307a34778a8476c7
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2015-10-01T15:56:13Z

    Remove groupBy and GroupedDataStream
    
    Their functionality is subsumed by keyBy and KeyedStream

commit fd729f616c4386ddc72ecc817ea166df7a8f76aa
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2015-10-01T19:23:56Z

    Add Scala API for new Windowing
    
    This adds window/timeWindow to KeyedStream along with windowAll/timeWindowAll
    on DataStream.
    
    The added API classes are AllWindowedStream and WindowedStream.
    
    This also adds Translations tests similar to those for the Java API:
     - AllWindowTranslationTest.scala
     - WindowTranslationTest.scala

commit fcaa0fef2700730b768364861fc10e6bab628f47
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2015-10-02T14:48:13Z

    WIP on Javadoc

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Stream API Refactoring

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1215#issuecomment-145482078
  
    Impressive work, looks good!
    
    Merging this means that we need to commit to reworking the `join()` implementation very soon.
    
    If that is the case, +1 from my side


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Stream API Refactoring

Posted by ktzoumas <gi...@git.apache.org>.
Github user ktzoumas commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1215#discussion_r41125422
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---
    @@ -24,49 +24,169 @@
     import org.apache.flink.api.java.functions.KeySelector;
     import org.apache.flink.api.java.typeutils.TypeExtractor;
     import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
    -import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
     import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
     import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
    +import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
     import org.apache.flink.streaming.api.operators.StreamGroupedFold;
     import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
    +import org.apache.flink.streaming.api.transformations.OneInputTransformation;
    +import org.apache.flink.streaming.api.transformations.PartitionTransformation;
    +import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
    +import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
    +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    +import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
    +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
    +import org.apache.flink.streaming.api.windowing.time.AbstractTime;
    +import org.apache.flink.streaming.api.windowing.time.EventTime;
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    +import org.apache.flink.streaming.api.windowing.windows.Window;
    +import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
    +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
     
     /**
    - * A GroupedDataStream represents a {@link DataStream} which has been
    - * partitioned by the given {@link KeySelector}. Operators like {@link #reduce},
    - * {@link #fold} etc. can be applied on the {@link GroupedDataStream} to
    - * get additional functionality by the grouping.
    + * A {@code KeyedStream} represents a {@link DataStream} on which operator state is
    + * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a
    + * {@code DataStream} are also possible on a {@code KeyedStream}, with the exception of
    + * partitioning methods such as shuffle, forward and keyBy.
      *
    - * @param <T> The type of the elements in the Grouped Stream.
    + * <p>
    + * Reduce-style operations, such as {@link #reduce}, {@link #sum} and {@link #fold} work on elements
    + * that have the same key.
    + *
    + * @param <T> The type of the elements in the Keyed Stream.
      * @param <KEY> The type of the key in the Keyed Stream.
      */
    -public class GroupedDataStream<T, KEY> extends KeyedDataStream<T, KEY> {
    +public class KeyedStream<T, KEY> extends DataStream<T> {
    +	
    +	protected final KeySelector<T, KEY> keySelector;
    +
    +	/**
    +	 * Creates a new {@link KeyedStream} using the given {@link KeySelector}
    +	 * to partition operator state by key.
    +	 * 
    +	 * @param dataStream
    +	 *            Base stream of data
    +	 * @param keySelector
    +	 *            Function for determining state partitions
    +	 */
    +	public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
    +		super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
    +		this.keySelector = keySelector;
    +	}
    +
    +	
    +	public KeySelector<T, KEY> getKeySelector() {
    +		return this.keySelector;
    +	}
    +
    +	
    +	@Override
    +	protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
    +		throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream.");
    +	}
    +
    +	
    +	@Override
    +	public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
    +			TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
    +
    +		SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);
    +
    +		((OneInputTransformation<T, R>) returnStream.getTransformation()).setStateKeySelector(keySelector);
    +		return returnStream;
    +	}
    +
    +	
    +	
    +	@Override
    +	public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
    +		DataStreamSink<T> result = super.addSink(sinkFunction);
    +		result.getTransformation().setStateKeySelector(keySelector);
    +		return result;
    +	}
    +	
    +	// ------------------------------------------------------------------------
    +	//  Windowing
    +	// ------------------------------------------------------------------------
     
     	/**
    -	 * Creates a new {@link GroupedDataStream}, group inclusion is determined using
    -	 * a {@link KeySelector} on the elements of the {@link DataStream}.
    +	 * Windows this {@code KeyedStream} into tumbling time windows.
     	 *
    -	 * @param dataStream Base stream of data
    -	 * @param keySelector Function for determining group inclusion
    +	 * <p>
    +	 * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
    +	 * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
    +	 * set using
    +	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
    +	 *
    +	 * @param size The size of the window.
    +	 */
    +	public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
    +		AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
    +
    +		if (actualSize instanceof EventTime) {
    +			return window(TumblingTimeWindows.of(actualSize));
    +		} else {
    +			return window(TumblingProcessingTimeWindows.of(actualSize));
    +		}
    +	}
    +
    +	/**
    +	 * Windows this {@code KeyedStream} into sliding time windows.
    +	 *
    +	 * <p>
    +	 * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
    +	 * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
    +	 * set using
    +	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
    +	 *
    +	 * @param size The size of the window.
     	 */
    -	public GroupedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
    -		super(dataStream, keySelector);
    +	public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime slide) {
    +		AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
    +		AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
    +
    +		if (actualSize instanceof EventTime) {
    +			return window(SlidingTimeWindows.of(actualSize, actualSlide));
    +		} else {
    +			return window(SlidingProcessingTimeWindows.of(actualSize, actualSlide));
    +		}
    +	}
    +
    +	/**
    +	 * Windows this data stream to a {@code WindowedStream}, which evaluates windows
    +	 * over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The
    +	 * grouping of elements is done both by key and by window.
    +	 *
    +	 * <p>
    +	 * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
    +	 * when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
    +	 * that is used if a {@code Trigger} is not specified.
    +	 *
    +	 * @param assigner The {@code WindowAssigner} that assigns elements to windows.
    +	 * @return The trigger windows data stream.
    +	 */
    +	public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
    +		return new WindowedStream<>(this, assigner);
     	}
     
    +	// ------------------------------------------------------------------------
    +	//  Non-Windowed aggregation operations
    +	// ------------------------------------------------------------------------
     
     	/**
     	 * Applies a reduce transformation on the grouped data stream grouped on by
     	 * the given key position. The {@link ReduceFunction} will receive input
     	 * values based on the key value. Only input values with the same key will
     	 * go to the same reducer.
    -	 * 
    +	 *
     	 * @param reducer
     	 *            The {@link ReduceFunction} that will be called for every
     	 *            element of the input values with the same key.
     	 * @return The transformed DataStream.
     	 */
     	public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer) {
    -		return transform("Grouped Reduce", getType(), new StreamGroupedReduce<T>(
    -				clean(reducer), keySelector));
    +		return transform("Grouped Reduce", getType(), new StreamGroupedReduce<>(clean(reducer), keySelector));
    --- End diff --
    
    "Grouped Reduce" or simply "Reduce"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Stream API Refactoring

Posted by ktzoumas <gi...@git.apache.org>.
Github user ktzoumas commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1215#discussion_r41125467
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---
    @@ -87,7 +207,7 @@ public GroupedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelect
     		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
     				Utils.getCallLocationName(), true);
     
    -		return transform("Grouped Fold", outType, new StreamGroupedFold<T, R>(clean(folder),
    +		return transform("Grouped Fold", outType, new StreamGroupedFold<>(clean(folder),
     				keySelector, initialValue));
    --- End diff --
    
    "Grouped Fold" or simply "Fold"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Stream API Refactoring

Posted by ktzoumas <gi...@git.apache.org>.
Github user ktzoumas commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1215#discussion_r41125302
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java ---
    @@ -81,56 +81,56 @@ protected IterativeDataStream(DataStream<T> dataStream, long maxWaitTime) {
     	/**
     	 * Changes the feedback type of the iteration and allows the user to apply
     	 * co-transformations on the input and feedback stream, as in a
    -	 * {@link ConnectedDataStream}.
    +	 * {@link ConnectedStreams}.
     	 *
     	 * <p>
     	 * For type safety the user needs to define the feedback type
     	 * 
     	 * @param feedbackTypeString
     	 *            String describing the type information of the feedback stream.
    -	 * @return A {@link ConnectedIterativeDataStream}.
    +	 * @return A {@link ConnectedIterativeDataStreams}.
     	 */
    -	public <F> ConnectedIterativeDataStream<T, F> withFeedbackType(String feedbackTypeString) {
    +	public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(String feedbackTypeString) {
     		return withFeedbackType(TypeInfoParser.<F> parse(feedbackTypeString));
     	}
     
     	/**
     	 * Changes the feedback type of the iteration and allows the user to apply
     	 * co-transformations on the input and feedback stream, as in a
    -	 * {@link ConnectedDataStream}.
    +	 * {@link ConnectedStreams}.
     	 *
     	 * <p>
     	 * For type safety the user needs to define the feedback type
     	 * 
     	 * @param feedbackTypeClass
     	 *            Class of the elements in the feedback stream.
    -	 * @return A {@link ConnectedIterativeDataStream}.
    +	 * @return A {@link ConnectedIterativeDataStreams}.
     	 */
    -	public <F> ConnectedIterativeDataStream<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
    +	public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
     		return withFeedbackType(TypeExtractor.getForClass(feedbackTypeClass));
    --- End diff --
    
    Why ConnectedIterativeDataStreams and not ConnectedIterativeStreams (following the naming of the other classes)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Stream API Refactoring

Posted by ktzoumas <gi...@git.apache.org>.
Github user ktzoumas commented on the pull request:

    https://github.com/apache/flink/pull/1215#issuecomment-145482079
  
    +1 to merge. This will make the testing of the new API much easier


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Stream API Refactoring

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha closed the pull request at:

    https://github.com/apache/flink/pull/1215


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Stream API Refactoring

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1215#discussion_r41125469
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java ---
    @@ -81,56 +81,56 @@ protected IterativeDataStream(DataStream<T> dataStream, long maxWaitTime) {
     	/**
     	 * Changes the feedback type of the iteration and allows the user to apply
     	 * co-transformations on the input and feedback stream, as in a
    -	 * {@link ConnectedDataStream}.
    +	 * {@link ConnectedStreams}.
     	 *
     	 * <p>
     	 * For type safety the user needs to define the feedback type
     	 * 
     	 * @param feedbackTypeString
     	 *            String describing the type information of the feedback stream.
    -	 * @return A {@link ConnectedIterativeDataStream}.
    +	 * @return A {@link ConnectedIterativeDataStreams}.
     	 */
    -	public <F> ConnectedIterativeDataStream<T, F> withFeedbackType(String feedbackTypeString) {
    +	public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(String feedbackTypeString) {
     		return withFeedbackType(TypeInfoParser.<F> parse(feedbackTypeString));
     	}
     
     	/**
     	 * Changes the feedback type of the iteration and allows the user to apply
     	 * co-transformations on the input and feedback stream, as in a
    -	 * {@link ConnectedDataStream}.
    +	 * {@link ConnectedStreams}.
     	 *
     	 * <p>
     	 * For type safety the user needs to define the feedback type
     	 * 
     	 * @param feedbackTypeClass
     	 *            Class of the elements in the feedback stream.
    -	 * @return A {@link ConnectedIterativeDataStream}.
    +	 * @return A {@link ConnectedIterativeDataStreams}.
     	 */
    -	public <F> ConnectedIterativeDataStream<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
    +	public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
     		return withFeedbackType(TypeExtractor.getForClass(feedbackTypeClass));
    --- End diff --
    
    You're right, I'll also rename those.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Stream API Refactoring

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1215#issuecomment-145254403
  
    I updates this to also add Javadocs for the new windowing semantics/internals.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---