You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by aljoscha <gi...@git.apache.org> on 2015/10/02 16:53:35 UTC
[GitHub] flink pull request: Stream API Refactoring
GitHub user aljoscha opened a pull request:
https://github.com/apache/flink/pull/1215
Stream API Refactoring
This is a WIP of the refactoring. I still want to add Javadocs and a new join operator based on tagged union that uses the new windowing operators.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/aljoscha/flink api-rework
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/1215.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1215
----
commit 560090348f8df93160501a55051a913625c7215e
Author: Aljoscha Krettek <al...@gmail.com>
Date: 2015-09-30T13:05:13Z
[FLINK-2666] Add timestamp extraction operator
This adds a user function TimestampExtractor and an operator
ExtractTimestampsOperator that can be used to extract timestamps and
attach them to elements to do event-time windowing.
Users can either use an AscendingTimestampExtractor that assumes that
timestamps are monotonically increasing. (This allows it to derive the
watermark very easily.) Or they use a TimestampExtractor, where they
also have to provide the watermark.
The ExtractTimestampOperator periodically (on the auto watermark
interval) calls the extractor to get the current watermark and forwards
it.
This also adds an ITCase for this behaviour.
commit 5b843231fdc5dee8d4ceada02f3ff8c41daa0281
Author: Aljoscha Krettek <al...@gmail.com>
Date: 2015-10-01T13:58:52Z
Simplify Stream Java API Class Names
KeyedDataStream -> KeyedStream
KeyedWindowDataStream -> WindowedStream
NonParallelWindowDataStream -> AllWindowedStream
KeyedWindowFunction -> WindowFunction
WindowFunction -> AllWindowFunction
(along with rich functions and reduce function wrappers)
WindowedStream.mapWindow -> WindowedStream.apply
AllWindowedStream.mapWindow -> AllWindowedStream.apply
Also renamed the tests to match the new names.
commit 187ed701f548666a5daaf244ee69d43032c39c6f
Author: Aljoscha Krettek <al...@gmail.com>
Date: 2015-10-01T15:07:11Z
Rename ConnectedDataStream to ConnectedStreams, Remove some operations
The removed operations are tricky and some of them are not working
correctly. For now, co-reduce, stream-cross and stream-join are
removed.
I'm planning to add a new join implementation based on tagged union
that uses the new windowing code.
commit 7892f321d0b0900a4331c7ee307a34778a8476c7
Author: Aljoscha Krettek <al...@gmail.com>
Date: 2015-10-01T15:56:13Z
Remove groupBy and GroupedDataStream
Their functionality is subsumed by keyBy and KeyedStream
commit fd729f616c4386ddc72ecc817ea166df7a8f76aa
Author: Aljoscha Krettek <al...@gmail.com>
Date: 2015-10-01T19:23:56Z
Add Scala API for new Windowing
This adds window/timeWindow to KeyedStream along with windowAll/timeWindowAll
on DataStream.
The added API classes are AllWindowedStream and WindowedStream.
This also adds Translations tests similar to those for the Java API:
- AllWindowTranslationTest.scala
- WindowTranslationTest.scala
commit fcaa0fef2700730b768364861fc10e6bab628f47
Author: Aljoscha Krettek <al...@gmail.com>
Date: 2015-10-02T14:48:13Z
WIP on Javadoc
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request: Stream API Refactoring
Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:
https://github.com/apache/flink/pull/1215#issuecomment-145482078
Impressive work, looks good!
Merging this means that we need to commit to reworking the `join()` implementation very soon.
If that is the case, +1 from my side
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request: Stream API Refactoring
Posted by ktzoumas <gi...@git.apache.org>.
Github user ktzoumas commented on a diff in the pull request:
https://github.com/apache/flink/pull/1215#discussion_r41125422
--- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---
@@ -24,49 +24,169 @@
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamGroupedFold;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.EventTime;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
/**
- * A GroupedDataStream represents a {@link DataStream} which has been
- * partitioned by the given {@link KeySelector}. Operators like {@link #reduce},
- * {@link #fold} etc. can be applied on the {@link GroupedDataStream} to
- * get additional functionality by the grouping.
+ * A {@code KeyedStream} represents a {@link DataStream} on which operator state is
+ * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a
+ * {@code DataStream} are also possible on a {@code KeyedStream}, with the exception of
+ * partitioning methods such as shuffle, forward and keyBy.
*
- * @param <T> The type of the elements in the Grouped Stream.
+ * <p>
+ * Reduce-style operations, such as {@link #reduce}, {@link #sum} and {@link #fold} work on elements
+ * that have the same key.
+ *
+ * @param <T> The type of the elements in the Keyed Stream.
* @param <KEY> The type of the key in the Keyed Stream.
*/
-public class GroupedDataStream<T, KEY> extends KeyedDataStream<T, KEY> {
+public class KeyedStream<T, KEY> extends DataStream<T> {
+
+ protected final KeySelector<T, KEY> keySelector;
+
+ /**
+ * Creates a new {@link KeyedStream} using the given {@link KeySelector}
+ * to partition operator state by key.
+ *
+ * @param dataStream
+ * Base stream of data
+ * @param keySelector
+ * Function for determining state partitions
+ */
+ public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
+ super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
+ this.keySelector = keySelector;
+ }
+
+
+ public KeySelector<T, KEY> getKeySelector() {
+ return this.keySelector;
+ }
+
+
+ @Override
+ protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
+ throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream.");
+ }
+
+
+ @Override
+ public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
+ TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
+
+ SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);
+
+ ((OneInputTransformation<T, R>) returnStream.getTransformation()).setStateKeySelector(keySelector);
+ return returnStream;
+ }
+
+
+
+ @Override
+ public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
+ DataStreamSink<T> result = super.addSink(sinkFunction);
+ result.getTransformation().setStateKeySelector(keySelector);
+ return result;
+ }
+
+ // ------------------------------------------------------------------------
+ // Windowing
+ // ------------------------------------------------------------------------
/**
- * Creates a new {@link GroupedDataStream}, group inclusion is determined using
- * a {@link KeySelector} on the elements of the {@link DataStream}.
+ * Windows this {@code KeyedStream} into tumbling time windows.
*
- * @param dataStream Base stream of data
- * @param keySelector Function for determining group inclusion
+ * <p>
+ * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
+ * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
+ * set using
+ * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
+ *
+ * @param size The size of the window.
+ */
+ public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
+ AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
+
+ if (actualSize instanceof EventTime) {
+ return window(TumblingTimeWindows.of(actualSize));
+ } else {
+ return window(TumblingProcessingTimeWindows.of(actualSize));
+ }
+ }
+
+ /**
+ * Windows this {@code KeyedStream} into sliding time windows.
+ *
+ * <p>
+ * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
+ * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
+ * set using
+ * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
+ *
+ * @param size The size of the window.
*/
- public GroupedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
- super(dataStream, keySelector);
+ public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime slide) {
+ AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
+ AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
+
+ if (actualSize instanceof EventTime) {
+ return window(SlidingTimeWindows.of(actualSize, actualSlide));
+ } else {
+ return window(SlidingProcessingTimeWindows.of(actualSize, actualSlide));
+ }
+ }
+
+ /**
+ * Windows this data stream to a {@code WindowedStream}, which evaluates windows
+ * over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The
+ * grouping of elements is done both by key and by window.
+ *
+ * <p>
+ * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
+ * when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
+ * that is used if a {@code Trigger} is not specified.
+ *
+ * @param assigner The {@code WindowAssigner} that assigns elements to windows.
+ * @return The trigger windows data stream.
+ */
+ public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
+ return new WindowedStream<>(this, assigner);
}
+ // ------------------------------------------------------------------------
+ // Non-Windowed aggregation operations
+ // ------------------------------------------------------------------------
/**
* Applies a reduce transformation on the grouped data stream grouped on by
* the given key position. The {@link ReduceFunction} will receive input
* values based on the key value. Only input values with the same key will
* go to the same reducer.
- *
+ *
* @param reducer
* The {@link ReduceFunction} that will be called for every
* element of the input values with the same key.
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer) {
- return transform("Grouped Reduce", getType(), new StreamGroupedReduce<T>(
- clean(reducer), keySelector));
+ return transform("Grouped Reduce", getType(), new StreamGroupedReduce<>(clean(reducer), keySelector));
--- End diff --
"Grouped Reduce" or simply "Reduce"?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request: Stream API Refactoring
Posted by ktzoumas <gi...@git.apache.org>.
Github user ktzoumas commented on a diff in the pull request:
https://github.com/apache/flink/pull/1215#discussion_r41125467
--- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---
@@ -87,7 +207,7 @@ public GroupedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelect
TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
Utils.getCallLocationName(), true);
- return transform("Grouped Fold", outType, new StreamGroupedFold<T, R>(clean(folder),
+ return transform("Grouped Fold", outType, new StreamGroupedFold<>(clean(folder),
keySelector, initialValue));
--- End diff --
"Grouped Fold" or simply "Fold"?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request: Stream API Refactoring
Posted by ktzoumas <gi...@git.apache.org>.
Github user ktzoumas commented on a diff in the pull request:
https://github.com/apache/flink/pull/1215#discussion_r41125302
--- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java ---
@@ -81,56 +81,56 @@ protected IterativeDataStream(DataStream<T> dataStream, long maxWaitTime) {
/**
* Changes the feedback type of the iteration and allows the user to apply
* co-transformations on the input and feedback stream, as in a
- * {@link ConnectedDataStream}.
+ * {@link ConnectedStreams}.
*
* <p>
* For type safety the user needs to define the feedback type
*
* @param feedbackTypeString
* String describing the type information of the feedback stream.
- * @return A {@link ConnectedIterativeDataStream}.
+ * @return A {@link ConnectedIterativeDataStreams}.
*/
- public <F> ConnectedIterativeDataStream<T, F> withFeedbackType(String feedbackTypeString) {
+ public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(String feedbackTypeString) {
return withFeedbackType(TypeInfoParser.<F> parse(feedbackTypeString));
}
/**
* Changes the feedback type of the iteration and allows the user to apply
* co-transformations on the input and feedback stream, as in a
- * {@link ConnectedDataStream}.
+ * {@link ConnectedStreams}.
*
* <p>
* For type safety the user needs to define the feedback type
*
* @param feedbackTypeClass
* Class of the elements in the feedback stream.
- * @return A {@link ConnectedIterativeDataStream}.
+ * @return A {@link ConnectedIterativeDataStreams}.
*/
- public <F> ConnectedIterativeDataStream<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
+ public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
return withFeedbackType(TypeExtractor.getForClass(feedbackTypeClass));
--- End diff --
Why ConnectedIterativeDataStreams and not ConnectedIterativeStreams (following the naming of the other classes)?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request: Stream API Refactoring
Posted by ktzoumas <gi...@git.apache.org>.
Github user ktzoumas commented on the pull request:
https://github.com/apache/flink/pull/1215#issuecomment-145482079
+1 to merge. This will make the testing of the new API much easier
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request: Stream API Refactoring
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha closed the pull request at:
https://github.com/apache/flink/pull/1215
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request: Stream API Refactoring
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/1215#discussion_r41125469
--- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java ---
@@ -81,56 +81,56 @@ protected IterativeDataStream(DataStream<T> dataStream, long maxWaitTime) {
/**
* Changes the feedback type of the iteration and allows the user to apply
* co-transformations on the input and feedback stream, as in a
- * {@link ConnectedDataStream}.
+ * {@link ConnectedStreams}.
*
* <p>
* For type safety the user needs to define the feedback type
*
* @param feedbackTypeString
* String describing the type information of the feedback stream.
- * @return A {@link ConnectedIterativeDataStream}.
+ * @return A {@link ConnectedIterativeDataStreams}.
*/
- public <F> ConnectedIterativeDataStream<T, F> withFeedbackType(String feedbackTypeString) {
+ public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(String feedbackTypeString) {
return withFeedbackType(TypeInfoParser.<F> parse(feedbackTypeString));
}
/**
* Changes the feedback type of the iteration and allows the user to apply
* co-transformations on the input and feedback stream, as in a
- * {@link ConnectedDataStream}.
+ * {@link ConnectedStreams}.
*
* <p>
* For type safety the user needs to define the feedback type
*
* @param feedbackTypeClass
* Class of the elements in the feedback stream.
- * @return A {@link ConnectedIterativeDataStream}.
+ * @return A {@link ConnectedIterativeDataStreams}.
*/
- public <F> ConnectedIterativeDataStream<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
+ public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
return withFeedbackType(TypeExtractor.getForClass(feedbackTypeClass));
--- End diff --
You're right, I'll also rename those.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request: Stream API Refactoring
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:
https://github.com/apache/flink/pull/1215#issuecomment-145254403
I updates this to also add Javadocs for the new windowing semantics/internals.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---