You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by aljoscha <gi...@git.apache.org> on 2016/06/16 09:12:46 UTC

[GitHub] flink pull request #2110: [FLINK-3974] Fix object reuse with multi-chaining

GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/2110

    [FLINK-3974] Fix object reuse with multi-chaining

    Before, a job would fail if object reuse was enabled and multiple
    operators were chained to one upstream operator. Now, we always create a
    shallow copy of the StreamRecord in OperatorChain.ChainingOutput because
    downstream operations change/reuse the StreamRecord.
    
    This fix was contributed by @wanderingbort (if this is the right github handle) as a patch on the Flink Jira. I can change the commit to attribute it to him but so far he didn't respond to my question about this on Jira.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink chaining/fix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2110.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2110
    
----
commit 092f350cccbda32331f527c4eaf7ad3304fa1811
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-06-14T10:18:35Z

    [FLINK-3974] Fix object reuse with multi-chaining
    
    Before, a job would fail if object reuse was enabled and multiple
    operators were chained to one upstream operator. Now, we always create a
    shallow copy of the StreamRecord in OperatorChain.ChainingOutput because
    downstream operations change/reuse the StreamRecord.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2110: [FLINK-3974] Fix object reuse with multi-chaining

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2110
  
    @tillrohrmann I pushed a commit that removes the per-operator object-reuse setting, refactors broadcasting and direct outputs and changes the ITCase to a test case. Happy reviewing. \U0001f603 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2110: [FLINK-3974] Fix object reuse with multi-chaining

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2110
  
    Aw man, I already hat "rename to Test" in my commit but forgot to add that ... \U0001f605 
    
    Thanks again, @tillrohrmann, I'll make the changes and merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2110: [FLINK-3974] Fix object reuse with multi-chaining

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2110
  
    Thanks for the thorough review, @tillrohrmann!
    
    Your points are valid, maybe I'll have to change this PR but let me first explain my reasoning.
    
    The shallow copy is performed in the one place that all code paths have to go through because it is the point right before control is passed to the operator. Putting it in different place would mean placing it in `BroadcastingOutputCollector`, as you mentioned, as well as in https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java, which is used when the user does a split()/select() operation (`DataStream.split()`). The number of places where we have to put this might evolve in the future.
    
    Also, putting it in `BroadcastingOutputCollector` and `DirectedOutput` would mean that we always do two copies per record for the common case of having object-copying enabled (which is the default).
    
    About the ITCase. I also don't like having that in there because we are approaching the 2h mark on Travis but I think in this case it's valid. This test really verifies that the whole system works correctly when the user uses a certain feature (I would also add a test for split()/select() now that I thought about it). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2110: [FLINK-3974] Fix object reuse with multi-chaining

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2110
  
    Excellent work @aljoscha. Great fix for the problem and it's also really nice that we could get rid of the IT case :-) +1 for merging after addressing my minor comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2110: [FLINK-3974] Fix object reuse with multi-chaining

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2110
  
    I'll try and come up with something, I'll probably remove the `isInputCopyDisabled` from operator and only allow a global setting for object reuse. This should simplify things.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2110: [FLINK-3974] Fix object reuse with multi-chaining

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2110#discussion_r68239970
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.collector.selector;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.streaming.api.graph.StreamEdge;
    +import org.apache.flink.streaming.api.operators.Output;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +
    +import java.util.List;
    +import java.util.Set;
    +
    +
    +/**
    + * Special version of {@link DirectedOutput} that performs a shallow copy of the
    + * {@link StreamRecord} to ensure that multi-chaining works correctly.
    + */
    +public class CopyingDirectedOutput<OUT> extends DirectedOutput<OUT> {
    +
    +	@SuppressWarnings({"unchecked", "rawtypes"})
    +	public CopyingDirectedOutput(
    +			List<OutputSelector<OUT>> outputSelectors,
    +			List<Tuple2<Output<StreamRecord<OUT>>, StreamEdge>> outputs) {
    +		super(outputSelectors, outputs);
    +	}
    +
    +	@Override
    +	public void collect(StreamRecord<OUT> record) {
    +		Set<Output<StreamRecord<OUT>>> selectedOutputs = selectOutputs(record);
    +
    +		for (Output<StreamRecord<OUT>> out : selectedOutputs) {
    +			StreamRecord<OUT> shallowCopy = record.copy(record.getValue());
    +			out.collect(shallowCopy);
    +		}
    --- End diff --
    
    Can't we save one copy operation by giving `record` to the last selected output?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2110: [FLINK-3974] Fix object reuse with multi-chaining

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2110#discussion_r68241018
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ChainingITCase.java ---
    @@ -0,0 +1,354 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.runtime.operators;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.operators.testutils.MockEnvironment;
    +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
    +import org.apache.flink.streaming.api.collector.selector.OutputSelector;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.SplitStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    +import org.apache.flink.streaming.api.graph.StreamConfig;
    +import org.apache.flink.streaming.api.operators.StreamMap;
    +import org.apache.flink.streaming.api.operators.StreamOperator;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.OperatorChain;
    +import org.apache.flink.streaming.runtime.tasks.StreamTask;
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +
    +import static org.hamcrest.Matchers.contains;
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Mockito.doAnswer;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +import static org.hamcrest.MatcherAssert.assertThat;
    +
    +/**
    + * Tests for stream operator chaining behaviour.
    + */
    +public class ChainingITCase {
    --- End diff --
    
    I think that this test case no longer needs to be an IT case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2110: [FLINK-3974] Fix object reuse with multi-chaining

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2110#discussion_r68240257
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---
    @@ -386,4 +402,23 @@ public void close() {
     			}
     		}
     	}
    +
    +	/**
    +	 * Special version of {@link BroadcastingOutputCollector} that performs a shallow copy of the
    +	 * {@link StreamRecord} to ensure that multi-chaining works correctly.
    +	 */
    +	private static final class CopyingBroadcastingOutputCollector<T> extends BroadcastingOutputCollector<T> {
    +
    +		public CopyingBroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
    +			super(outputs);
    +		}
    +
    +		@Override
    +		public void collect(StreamRecord<T> record) {
    +			for (Output<StreamRecord<T>> output : outputs) {
    +				StreamRecord<T> shallowCopy = record.copy(record.getValue());
    +				output.collect(shallowCopy);
    +			}
    --- End diff --
    
    Here, the same. I think we could save one copying operation by giving the original `record` to the last output.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2110: [FLINK-3974] Fix object reuse with multi-chaining

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha closed the pull request at:

    https://github.com/apache/flink/pull/2110


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2110: [FLINK-3974] Fix object reuse with multi-chaining

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2110#discussion_r68077418
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ChainingITCase.java ---
    @@ -0,0 +1,70 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.runtime.operators;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
    +import org.junit.Test;
    +
    +/**
    + * Tests for stream operator chaining behaviour.
    + */
    +public class ChainingITCase extends StreamingMultipleProgramsTestBase {
    --- End diff --
    
    Do we have to instantiate an expensive IT test case to test the correct chaining behaviour? Maybe we could save some cycles if we don't start a complete Flink cluster for that and test directly the chained task.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2110: [FLINK-3974] Fix object reuse with multi-chaining

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2110#discussion_r68076977
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---
    @@ -306,8 +306,9 @@ public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
     		@Override
     		public void collect(StreamRecord<T> record) {
     			try {
    -				operator.setKeyContextElement1(record);
    -				operator.processElement(record);
    +				StreamRecord<T> shallowCopy = record.copy(record.getValue());
    +				operator.setKeyContextElement1(shallowCopy);
    +				operator.processElement(shallowCopy);
    --- End diff --
    
    Actually I'm wondering whether the `ChainingOutput` is the right place to do this copying. Wouldn't it make more sense to do it in the `BroadcastingOutputCollector`, because only if we have a branching chained data flow we have to make sure that every down stream operator get his own copy of the record. For simple chaining it should be correct to reuse the stream record.
    
    So I would adapt the `collect` method of `BroadcastingOutputCollector` the following way:
    
    ```
    public void collect(StreamRecord<T> record) {
    	for (int i = 0; i < outputs.length - 1; i++) {
    		StreamRecord<T> shallowCopy = record.copy(record.getValue());
    		outputs[i].collect(shallowCopy);
    	}
    
    	outputs[outputs.length - 1].collect(record);
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2110: [FLINK-3974] Fix object reuse with multi-chaining

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2110
  
    Thanks for the contribution @wanderingbort and @aljoscha. I might be wrong, but maybe there is a slightly better place for the copying operation. Furthermore, I think that it would be beneficial if we could test the correct behaviour without instantiating an expensive IT case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2110: [FLINK-3974] Fix object reuse with multi-chaining

Posted by XuPingyong <gi...@git.apache.org>.
Github user XuPingyong commented on the issue:

    https://github.com/apache/flink/pull/2110
  
    I do not agree with this pr as it always copy  StreamRecord to downstream operator.
    
    StreamMap change the input StreamRecord, so this pr works well. But many operators do not change/reuse the input StreamRecord, like StreamFlatMap.
    
    The following code no not need the extra copy.
    `DataStream<A> input = ...
    input
        .flatmap(FlatMapFunction<A,B>...)
        .addSink(...);
    
    input
        .flatmap(FlatMapFunction<A,C>...)
        ​.addSink(...);`
    
        So I think we can change StreamMap to not reuse the input StreamRecord. And directly send the StreamRecord if objectReuse is set true.  
        What do you think?


---