You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mbalassi <gi...@git.apache.org> on 2015/09/21 13:20:15 UTC

[GitHub] flink pull request: [FLINK-2283] [streaming] Make grouped reduce/f...

GitHub user mbalassi opened a pull request:

    https://github.com/apache/flink/pull/1155

    [FLINK-2283] [streaming] Make grouped reduce/fold/aggregations stateful

    There is an open discussion at the related ticket [1] about fully removing the operators that I touch and partially remove here. I can accept both conclusions of the discussion, but even in the scenario when the operators get removed from the API afterwards the PR has certain merit to it:
    
    1. Cleans up the unused `StreamReduce` and `StreamFold` operators which should be removed either way.
    2. Adds an integration test for ensuring that not only user defined functions, but internal streaming operators can properly rely on the `OperatorState` interface. To do this it currently relies on the grouped reduce/fold aggregations, but this is just as important for windowing states, which are not properly checkpointed yet.
    3. Makes the grouped fold/reduce operators stateful, so that the previous test can be written.
    
    Some justification for the implementation choices:
    
    1. @gyfora has suggested to use the partitioned state at the ticket [1] instead of the manual map creation. In this scenario the grouped operators would not be unit testable any more as they would be dependent on the state partitioner information found in the keyed datastream. I decided against it.
    
    2. @StephanEwen has recently advised against adding unnecessary integration tests. [2] This is a feature that can only be tested as an integration test. I personally feel the need to cover internal operators with a checkpointing test despite the fact they currently use exactly the same mechanism as the UDFs as this implementation might be subject to slight changes.
    
    3. Elaborating on the previous point the `OperatorState` currently storing the internal state is also accessible for the user. This is an undesirable feature and might lead to accidental overwrite of the state. I am opening a Jira ticket for this. 
    
    [1] https://issues.apache.org/jira/browse/FLINK-2283
    [2] https://mail-archives.apache.org/mod_mbox/flink-dev/201509.mbox/%3CCANC1h_vvekciNVDzqCb8N4E5Kfzu4e1Mosnse1%3DV11HXnD2PBQ%40mail.gmail.com%3E

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mbalassi/flink aggregator-states

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1155.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1155
    
----
commit 29ca808ccb8a1d705927eabb492e70df5e5af06c
Author: mbalassi <mb...@apache.org>
Date:   2015-09-11T14:32:09Z

    [streaming] Removed unused StreamReduce
    
    Refactored corresponding tests, some minor cleanups.

commit 4bd1dd035780402919bb5257274e9258457dadf3
Author: mbalassi <mb...@apache.org>
Date:   2015-09-13T06:19:07Z

    [FLINK-2283] [streaming] grouped reduce and fold operators checkpoint state

commit 3688a7c98500179f454e1641aedd7758b1fdc644
Author: mbalassi <mb...@apache.org>
Date:   2015-09-20T20:27:11Z

    [FLINK-2283] [streaming] Test for checkpointing in internal operators

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2283] [streaming] Make grouped reduce/f...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1155#issuecomment-141957390
  
    @aljoscha 
    A custom StateCheckpointer should be used in this case, there were some issues that I fixed in https://github.com/apache/flink/pull/1154 regarding this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2283] [streaming] Make grouped reduce/f...

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on the pull request:

    https://github.com/apache/flink/pull/1155#issuecomment-141954741
  
    @gyfora: I do not necessarily see that fit together yet, but let us have a brainstorm on the topic.
    @StephanEwen: This is the "infinite" version, but I would like to continue with adding it to the windowed versions.
    @aljoscha: Thanks for spotting it, that is a problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2283] [streaming] Make grouped reduce/f...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1155#issuecomment-141945790
  
    I still think that we should use PartitionedOperatorStates here, that will also help in case we start thinking about dynamic repartitioning. It's not hard to modify the tests so that they use some partitioning info (so if it unit testable with local state it is unit testable with partitioned state).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2283] [streaming] Make grouped reduce/f...

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on the pull request:

    https://github.com/apache/flink/pull/1155#issuecomment-145230558
  
    Quick update:
    I made some progress on the custom state checkpointer solution, but run into an issue with key selectors. This would break the current API usage of aggregations, so I need to solve it or work around it first. [1]
    
    [1] https://issues.apache.org/jira/browse/FLINK-2812


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2283] [streaming] Make grouped reduce/f...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1155#issuecomment-145489620
  
    @marton: I am making an attempt to make the state backend abstraction a bit more powerful, as described here: https://issues.apache.org/jira/browse/FLINK-2808


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2283] [streaming] Make grouped reduce/f...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1155#issuecomment-145867855
  
    I think it is fine. As I see it, your changes only use the key/value state, which will continue to be surfaced in the same way, so it should work together fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2283] [streaming] Make grouped reduce/f...

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on the pull request:

    https://github.com/apache/flink/pull/1155#issuecomment-145665200
  
    @StephanEwen : Thanks for the heads-up.
    
    Now I have addressed the comments and would like to merge this tomorrow if there are no objections.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2283] [streaming] Make grouped reduce/f...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1155#issuecomment-145868188
  
    @mbalassi This is a valid case for an integration test!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2283] [streaming] Make grouped reduce/f...

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on the pull request:

    https://github.com/apache/flink/pull/1155#issuecomment-145869707
  
    @StephanEwen: Thanks, I will go ahead and merge it.
    I still had a small hick-up in the scala api, that I had fixed just now - so I am waiting for travis.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2283] [streaming] Make grouped reduce/f...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1155#issuecomment-141947734
  
    Is this addressing windowed grouped operations, or the "infinite state" aggregations?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2283] [streaming] Make grouped reduce/f...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1155


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2283] [streaming] Make grouped reduce/f...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1155#issuecomment-141952418
  
    It is for the "infinite state" aggregations.
    
    I think this will fail if the types (input/output and key type) are not serializable. For those to work a TypeSerializer must be used to but a serialized value into the state map. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2283] [streaming] Make grouped reduce/f...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1155#discussion_r39965297
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java ---
    @@ -0,0 +1,267 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing;
    +
    +import com.google.common.collect.EvictingQueue;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.FoldFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.common.state.OperatorState;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.checkpoint.Checkpointed;
    +import org.apache.flink.streaming.api.datastream.GroupedDataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    +import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
    +import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
    +import org.apache.flink.util.Collector;
    +import org.junit.Assert;
    +
    +import java.util.Queue;
    +import java.util.Random;
    +
    +/**
    + * Integration test ensuring that the persistent state defined by the implementations
    + * of {@link AbstractUdfStreamOperator} is correctly restored in case of recovery from
    + * a failure.
    + *
    + * <p>
    + * The topology currently tests the proper behaviour of the {@link StreamGroupedReduce}
    + * operator.
    + */
    +@SuppressWarnings("serial")
    +public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTestBase {
    +
    +	final private static long NUM_INPUT = 2_500_000L;
    +	final private static int NUM_OUTPUT = 1_000;
    +
    +	/**
    +	 * Assembles a stream of a grouping field and some long data. Applies reduce functions
    +	 * on this stream.
    +	 */
    +	@Override
    +	public void testProgram(StreamExecutionEnvironment env) {
    +
    +		// base stream
    +		GroupedDataStream<Tuple2<Integer, Long>> stream = env.addSource(new StatefulMultipleSequence())
    +				.groupBy(0);
    +
    +
    +		stream
    +				// testing built-in aggregate
    +				.min(1)
    +				// failure generation
    +				.map(new OnceFailingIdentityMapFunction(NUM_INPUT))
    +				.groupBy(0)
    +				.addSink(new MinEvictingQueueSink());
    +
    +		stream
    +				// testing UDF reducer
    +				.reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
    +					@Override
    +					public Tuple2<Integer, Long> reduce(
    +							Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
    +						return Tuple2.of(value1.f0, value1.f1 + value2.f1);
    +					}
    +				})
    +				.groupBy(0)
    +				.addSink(new SumEvictingQueueSink());
    +
    +		stream
    +				// testing UDF folder
    +				.fold(Tuple2.of(0, 0L), new FoldFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>>() {
    +					@Override
    +					public Tuple2<Integer, Long> fold(
    +							Tuple2<Integer, Long> accumulator, Tuple2<Integer, Long> value) throws Exception {
    +						return Tuple2.of(value.f0, accumulator.f1 + value.f1);
    +					}
    +				})
    +				.groupBy(0)
    +				.addSink(new FoldEvictingQueueSink());
    --- End diff --
    
    Formatting looks off on Github


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---