You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fanyon <gi...@git.apache.org> on 2017/04/25 02:09:32 UTC

[GitHub] flink pull request #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Parti...

GitHub user fanyon opened a pull request:

    https://github.com/apache/flink/pull/3766

    [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

    1. add extra api addTopicPartitioner, user can use it to add special topic and partitioner
    2. add topicPartitionerMap in FlinkKafkaProducerBase to store the topic and partitioner
    3. add PartitionerInfo to manage the topic and partitioner info


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fanyon/flink FLINK-6288

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3766.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3766
    
----
commit a525fe605c25ce2e3c8c30cbc7c60542243c0a18
Author: mengji.fy <me...@taobao.com>
Date:   2017-04-24T06:16:48Z

    [FLINK-6288] fix target topic uses partitioner of default topic

commit 071e06c00e8a2346d4ebcede8784f1ada5457da2
Author: mengji.fy <me...@taobao.com>
Date:   2017-04-25T02:03:08Z

    add serialVersionUID field in PartitionerInfo

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by zjureel <gi...@git.apache.org>.
Github user zjureel commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    @tzulitai Thank you for your suggestion, and I think you are right. I will create a new PR from the master and  cherry-pick my commits for this issue soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by fanyon <gi...@git.apache.org>.
Github user fanyon commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    @gyfora For the method `int partition(T next, byte[] serializedKey, byte[] serializedValue, int partitionNum)` in KafkaPartitioner, the correct partition num of target topic can be used. But the KafkaPartitioner's partition id array has been initialized in `void open(int parallelInstanceId, int parallelInstances, int[] partitions)`, which will be executed once, so yes, the problem for dynamic new topics when user uses older KafkaPartitioner API in their older job will still exist, and I find it hard to solve this problem completely.
    
    What do you think of this? @tzulitai 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    @tzulitai should we try to get this in the release?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Parti...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3766#discussion_r114282783
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---
    @@ -281,6 +307,47 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
     		}
     	}
     
    +	protected int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
    +		Future<int[]> future = executor.submit(new PartitionMetaTask(topic, producer));
    +
    +		try {
    +			return future.get(kafkaMetaTimeoutMs, TimeUnit.MILLISECONDS);
    +		} catch (Exception e) {
    +			throw new RuntimeException(e);
    --- End diff --
    
    Should we maybe retry here a few times?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    @zjureel the rebases don't seem to be done correctly. The PR should contain the diff commits only.
    I'm not sure what went wrong, but perhaps the most easiest way right now is cherry-pick your diff commits on a new branch checkedout from the latest master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    @fanyon I'll finally have some time to get back to this PR this week (perhaps over the next 2 days). Thanks a lot for your patience ...
    
    @gyfora I'm personally a +1 to try to get this in the release because it really is a self-contained thing, but I think it'll probably depend on the status of the 1.3 release in the end.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    Nice following the discussions here :) Let me wrap up the discussion so far:
    
    The old way -
    ```
    interface KafkaPartitioner {
        void open(int[] partitions, int subtaskIndex, int numSubtasks);
        int partition(T record, byte[] key, byte[] value, int numPartitions);
    }
    ```
    
    The (last) proposed new way -
    ```
    interface FlinkKafkaPartitioner {
        void open(int subtaskIndex, int numSubtask);
        int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions)
    }
    ```
    and have an internal cache of partitioner informations: `Map<String, PartitionerInfo>`.
    The `PartitionerInfo` can actually just be the partition id array, I don't think we need another wrapper class if we just need a single `FlinkKafkaPartitioner` per subtask for all (including dynamic) topics.
    
    I like the proposal of the new partitioner, as then users do not need to provide multiple partitioners. Just the question with how well this works for the general use case, because then implementations of the new `partition` method need to handle different topics (which probably makes sense because we want to generally treat topics as dynamic anyways ..). The new way can also allow us to handle upscaled target topics in the future.
    
    For migration, for the dummy wrapper delegation, I think we should just mimc the wrong, old behaviour. That was the behaviour it had always been anyways, so we should not try to alter the behaviour if the user is still using the old API. Deprecation and Javadoc message is responsible of pushing them to change to the new API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by fanyon <gi...@git.apache.org>.
Github user fanyon commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    @tzulitai Thank you for your reply. For 2, I think a new issue may be created later.
    
    For 1, it is really a problem for which will block the running job.  There may be two ways:
    
    1. Depend on the timeout mechanism of kafka. When fetching partition meta from kafka, some timeout configurations should be setted.
    2. Using Future to get the partiton meta of kafka, and user can set the timeout with configuration.
    
    For the 1th way, problem may still exist for network and other reasons, so I'm apt to use the 2ed way. 
    cc @gyfora 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by fanyon <gi...@git.apache.org>.
Github user fanyon commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    The new API should in a new class, such as FlinkKafkaPartitioner. For the older KafkaPartitioner implementation, it will be delegated by FlinkKafkaAdpterPartitioner extends FlinkKafkaPartitioner, in which contains defaultTopicId/partitions and KafkaPartitioner delegate, and map the new api to the current one in terms. Of course, as it is now, default topic's partitions will be used for the diffrent target topics in FlinkKafkaAdpterPartitioner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    Hi,
    Thanks for the PR!
    
    The first problem I noticed with this approach is that it will not work if users want to partition dynamically created topics (my use case actually). 
    
    We should have a default partitioner that could be applied to the unmatched topics and would always pass the correct partition number.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    @zjureel could you rebase the PR on the latest master? Otherwise I cannot review the PR like this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    Thanks for the PR @fanyon. I'll try to look at the changes soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Parti...

Posted by fanyon <gi...@git.apache.org>.
Github user fanyon closed the pull request at:

    https://github.com/apache/flink/pull/3766


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    I think this is reasonable as the current implementation doesnt work for dynamic new topics. (we should also deprecate the current one)
    
    But let's hear what others say :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    One thing to be careful with, though:
    Since now we're querying Kafka for partition metadata within the `invoke` method, the query must be handled robustly and make sure it doesn't result in unexpectedly longer checkpoint times by blocking the whole stream at the Kafka sink.
    
    Most notably, we need to consider the corner case where Kafka isn't cooperating nicely:
    1. how to handle arbitrary long response time for fetching the partition metadata?
    2. how to handle the case where, due to some Kafka brokers temporary unavailable, the returned partitions is not complete?
    
    For 2., I can also forsee that we have a separate "partitions update thread" that refreshes the `Map<String, int[]>` cache continuously at a fixed interval. This can also involve to a `FlinkKafkaPartitioner` that can provide dynamically changing `int[] partitions` when invoking the `partition` method.
    
    Perhaps we shouldn't include that with this PR, as its orthogonal to the API change. Just some food for though :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by fanyon <gi...@git.apache.org>.
Github user fanyon commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    @gyfora Thanks for your comment. And right, your question is very good. I originally thought that the user must be sure to know all the output topic when the job is submitted, but in the real business scenario, the data may be output to the dynamically generated topic. 
    
    For the requirementof generate dynamic topic, I propose to adjust the open and partition api of KafkaPartitioner as follows:
    1. The open method, remove the parameter int[] partitions, and will be opend once for each partitioner
    public void open(int parallelInstanceId, int parallelInstances)
    2. The partition method, add int[] partitions and target topic parameters
    public int partition(T next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions)
    
    @gyfora @tzulitai What do you think of this? Please feel free to give any suggestions, thanks!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Parti...

Posted by fanyon <gi...@git.apache.org>.
Github user fanyon commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3766#discussion_r114466534
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---
    @@ -281,6 +307,47 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
     		}
     	}
     
    +	protected int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
    +		Future<int[]> future = executor.submit(new PartitionMetaTask(topic, producer));
    +
    +		try {
    +			return future.get(kafkaMetaTimeoutMs, TimeUnit.MILLISECONDS);
    +		} catch (Exception e) {
    +			throw new RuntimeException(e);
    --- End diff --
    
    Yes, retry here will be nicer, I'll fix it, thanks :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by fanyon <gi...@git.apache.org>.
Github user fanyon commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    As discussed above, I have updated the API and code.  Please review the changes when you are free, and be free to give any suggestions to me, thanks! @gyfora @tzulitai 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    seems like the relevant commits are e6ec702 and 64af26e.
    Let me try to resolve this .. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by fanyon <gi...@git.apache.org>.
Github user fanyon commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    @tzulitai I have created a new PR [https://github.com/apache/flink/pull/3901](url), and I will close this PR, you can review the code there, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    How would this new API map to the current one in terms of backwards compatibility?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by zjureel <gi...@git.apache.org>.
Github user zjureel commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    @tzulitai Thank you for your replay. 
    
    For 1, the `ExecutorService` is used to control timeout of fetching kafka partitions. When fetch kafka partitions, a `Future` will be created and executed in `ExecutorService`, which will wait for some mills and throw exception for timeout.
    For 2, I had deprecated constructors of 08 / 09/ 010 whose parameter is `KafkaPartitioner` and add the same constructors with parameter `FlinkKafkaPartitioner`.
    
    I find the codes in master of apache/flink were modified relative large some days ago, and I try rebase these modification soon. I think you can review these issues after that, thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    @zjureel thanks. You would need a proper rebase: `git rebase master` when you finish your feature branch, instead of merging the latest master.
    
    Regarding timeout: doesn't the Kafka client have built-in timeout functionality?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the issue:

    https://github.com/apache/flink/pull/3766
  
    I liked the proposed API and I agree that it's probably best to keep the old behaviour for the deprecated API.
    
    I don't think the Kafka partition info fetching should be a huge problem as it shouldnt happen too often and Kafka should be able to return the info if you can write to it. We of course need some timeout/retry mechanism to not fail unnecessarily.
    
    The producer itself is not very resilient in case of errors in the current state as it can't really handle the async errors it will just shut throw them and fail. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---