You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by pnowojski <gi...@git.apache.org> on 2018/05/09 12:17:50 UTC

[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...

GitHub user pnowojski opened a pull request:

    https://github.com/apache/flink/pull/5977

    [FLINK-9295][kafka] Fix transactional.id collisions for FlinkKafkaProducer011

    Previously if there were two completely independent FlinkKafkaProducer011 data sinks in the job graph, their transactional.id would collide with one another. Fix is to use operator's unique ID as well along task name and subtask id.
    
    In order to do that, operator's unique ID has to be exposed to UDF via `RuntimeContext`.
        
    This change is backward compatible for recovering from older savepoints, since transactional.ids generated by the old generator still will be used after restoring from state.
    
    ## Brief change log
    
    Please check individual commit messages
    
    ## Verifying this change
    
    This change adds a new `FlinkKafaProducer011` test case that covers bug fix.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (**yes** / no)
      - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pnowojski/flink f9295

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5977.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5977
    
----
commit f7ee017f7e87d96182c859ad4c7849e93785e3e9
Author: Piotr Nowojski <pi...@...>
Date:   2018-05-09T09:12:18Z

    [hotfix][tests] Introduce MockEnvironmentBuilder to deduplicate MockEnvironment constructors

commit c19c56205846c13ef33f436ea49eabd5eb670408
Author: Piotr Nowojski <pi...@...>
Date:   2018-05-09T09:29:18Z

    [hotfix][tests] Reduce mockito usage in tests

commit fa5bc99c3029f6fe4b772ee9b0f73dd42fcc445a
Author: Piotr Nowojski <pi...@...>
Date:   2018-05-08T15:46:29Z

    [FLINK-9316][streaming] Expose operator's unique ID in DataStream programs
    
    This allows to uniquely and stably across multiple job submissions identify operators.
    Previously two different operators that were executed by tasks that had the same name
    were indistinguishable.

commit fe34837a820b8c3db0b758a2a1cdcafaed1813e8
Author: Piotr Nowojski <pi...@...>
Date:   2018-05-08T15:49:31Z

    [FLINK-9295][kafka] Fix two transactional.id collisions for FlinkKafkaProducer011
    
    Previously if there were two completely independent FlinkKafkaProducer011 data sinks
    in the job graph, their transactional.id would collide with one another. Fix is to
    use operator's unique ID as well along task name and subtask id.
    
    This change requires backwardcompatibility check for recovering from older savepoints.

----


---

[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5977#discussion_r187646041
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---
    @@ -71,6 +71,17 @@
     	@PublicEvolving
     	MetricGroup getMetricGroup();
     
    +	/**
    +	 * Returned value is guaranteed to be unique between operators within the same job and to be
    +	 * stable and the same across job submissions.
    +	 *
    +	 * <p>This operation is currently only supported in Streaming (DataStream) contexts.
    +	 *
    +	 * @return String representation of the operator's unique id.
    +	 */
    +	@PublicEvolving
    +	String getOperatorUniqueID();
    --- End diff --
    
    Probably better solution from the perspective of this particular use case, but do the UDF's have an access to `StreamingRuntimeContext`? If not, casting requirement would be huge blocker for any users from using this, thus making this almost a private API.
    
    However I do not mind one way or the other.
    
    PS, There are already operations/methods that do not work in streaming/batch


---

[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5977#discussion_r187640541
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---
    @@ -71,6 +71,17 @@
     	@PublicEvolving
     	MetricGroup getMetricGroup();
     
    +	/**
    +	 * Returned value is guaranteed to be unique between operators within the same job and to be
    +	 * stable and the same across job submissions.
    +	 *
    +	 * <p>This operation is currently only supported in Streaming (DataStream) contexts.
    +	 *
    +	 * @return String representation of the operator's unique id.
    +	 */
    +	@PublicEvolving
    +	String getOperatorUniqueID();
    --- End diff --
    
    Rather than adding this here and failing for *DataSet* programs, hoe about adding this to `StreamingRuntimeContext` and casting inside the Kafka Producer? Not super pretty, but nicer than having something that looks like a pretty generic concept (operator id) throwing an exception in a whole class of programs (batch jobs). This problem should go away anyways with the batch / streaming unification later.


---

[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5977#discussion_r188205000
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---
    @@ -71,6 +71,17 @@
     	@PublicEvolving
     	MetricGroup getMetricGroup();
     
    +	/**
    +	 * Returned value is guaranteed to be unique between operators within the same job and to be
    +	 * stable and the same across job submissions.
    +	 *
    +	 * <p>This operation is currently only supported in Streaming (DataStream) contexts.
    +	 *
    +	 * @return String representation of the operator's unique id.
    +	 */
    +	@PublicEvolving
    +	String getOperatorUniqueID();
    --- End diff --
    
    I'm slightly leaning towards Stephan's suggestion, which I also agree is the better solution for this case.
    
    It might be ok to have this as a "hidden" API for now anyways, since 1) it is marked `@PublicEvolving`, and 2) the API was added in quite a short timeframe.
    If we want this fix in 1.5, I wouldn't suggest "fully" exposing it.


---

[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5977#discussion_r187592761
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java ---
    @@ -0,0 +1,35 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.util;
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.jobgraph.OperatorID;
    +import org.apache.flink.streaming.api.graph.StreamConfig;
    +
    +/**
    + * Handy mock for {@link StreamConfig}.
    + */
    +public class MockStreamConfig extends StreamConfig {
    +	public MockStreamConfig() {
    +		super(new Configuration());
    +
    +		setTypeSerializerIn1(IntSerializer.INSTANCE);
    --- End diff --
    
    Good point, fixed.


---

[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5977#discussion_r188725700
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---
    @@ -71,6 +71,17 @@
     	@PublicEvolving
     	MetricGroup getMetricGroup();
     
    +	/**
    +	 * Returned value is guaranteed to be unique between operators within the same job and to be
    +	 * stable and the same across job submissions.
    +	 *
    +	 * <p>This operation is currently only supported in Streaming (DataStream) contexts.
    +	 *
    +	 * @return String representation of the operator's unique id.
    +	 */
    +	@PublicEvolving
    +	String getOperatorUniqueID();
    --- End diff --
    
    I am still much in favor of not exposing this in the RuntimeContext:
    
      - Having the state accesses in the RuntimeContext was a necessity of that moment, because there was no `initializeState()` and it is crucial to be exposed to users.
      - This operatorID is not crucial to be exposed to users, hence a very different case to me.
    
      - It is super easy to expose it later, it is much harder (even if marked as PublicEvolving) to hide it later. For a quick move, not exposing an addition publicly should always be the default choice, also beyond this specific case here.



---

[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5977


---

[GitHub] flink issue #5977: [FLINK-9295][kafka] Fix transactional.id collisions for F...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/5977
  
    I have removed `getOperatorUniqueID()` from `RuntimeContext` and our `FlinkKafkaProducer011` is now casting `RuntimeContext` to `StreamingRuntimeContext`.


---

[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5977#discussion_r187250723
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java ---
    @@ -0,0 +1,35 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.util;
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.jobgraph.OperatorID;
    +import org.apache.flink.streaming.api.graph.StreamConfig;
    +
    +/**
    + * Handy mock for {@link StreamConfig}.
    + */
    +public class MockStreamConfig extends StreamConfig {
    +	public MockStreamConfig() {
    +		super(new Configuration());
    +
    +		setTypeSerializerIn1(IntSerializer.INSTANCE);
    --- End diff --
    
    not sure about this one. I would expect a `MockStreamConfig` to be generally usable, but this one can mess things up in regards to serializers. I'd just remove the mocking and replace it with 
    ```
    StreamConfig streamConfig = new StreamConfig();
    streamConfig.setTypeSerializerIn1(IntSerializer.INSTANCE);
    ```


---

[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5977#discussion_r188220497
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---
    @@ -71,6 +71,17 @@
     	@PublicEvolving
     	MetricGroup getMetricGroup();
     
    +	/**
    +	 * Returned value is guaranteed to be unique between operators within the same job and to be
    +	 * stable and the same across job submissions.
    +	 *
    +	 * <p>This operation is currently only supported in Streaming (DataStream) contexts.
    +	 *
    +	 * @return String representation of the operator's unique id.
    +	 */
    +	@PublicEvolving
    +	String getOperatorUniqueID();
    --- End diff --
    
    I slightly prefer the current approach. Looking at other methods, this seems to be the "established" way for methods that do not exist in batch. Why mixing up two different ways of dealing with them? 


---

[GitHub] flink issue #5977: [FLINK-9295][kafka] Fix transactional.id collisions for F...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5977
  
    Thanks for the update @pnowojski.
    Changes LGTM, +1.
    
    Merging this ..


---

[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5977#discussion_r188221002
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---
    @@ -71,6 +71,17 @@
     	@PublicEvolving
     	MetricGroup getMetricGroup();
     
    +	/**
    +	 * Returned value is guaranteed to be unique between operators within the same job and to be
    +	 * stable and the same across job submissions.
    +	 *
    +	 * <p>This operation is currently only supported in Streaming (DataStream) contexts.
    +	 *
    +	 * @return String representation of the operator's unique id.
    +	 */
    +	@PublicEvolving
    +	String getOperatorUniqueID();
    --- End diff --
    
    Different point here, I think we would usually like to return `OperatorID` here and not string, but it is probably in a wrong module. When moving `OperatorID` seems to much, I would suggest to call the method something like `getOperatorIDAsString` to make this a bit more explicit.


---

[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5977#discussion_r187645022
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ---
    @@ -837,7 +837,7 @@ public void initializeState(FunctionInitializationContext context) throws Except
     		nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
     			NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
     		transactionalIdsGenerator = new TransactionalIdsGenerator(
    -			getRuntimeContext().getTaskName(),
    +			getRuntimeContext().getTaskName() + "-" + getRuntimeContext().getOperatorUniqueID(),
    --- End diff --
    
    Yes, I wanted to keep the task name for the readability reasons.


---

[GitHub] flink issue #5977: [FLINK-9295][kafka] Fix transactional.id collisions for F...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5977
  
    Overall LGTM 👍 


---

[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5977#discussion_r187639708
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ---
    @@ -837,7 +837,7 @@ public void initializeState(FunctionInitializationContext context) throws Except
     		nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
     			NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
     		transactionalIdsGenerator = new TransactionalIdsGenerator(
    -			getRuntimeContext().getTaskName(),
    +			getRuntimeContext().getTaskName() + "-" + getRuntimeContext().getOperatorUniqueID(),
    --- End diff --
    
    You could probably use only the Operator ID here - the task name does not add to the uniqueness. Unless the purpose of the task name is "human readability" in log files or metrics.


---