You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2021/06/01 21:47:59 UTC

[gobblin] 01/02: GOBBLIN-1455: Limit gobblin.kafka.minContainersForTopic config to the number of partitions of the topic

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git

commit c9885d81a63747228bcd1b6a403dd017109703dc
Author: suvasude <su...@linkedin.biz>
AuthorDate: Tue Jun 1 10:52:25 2021 -0700

    GOBBLIN-1455: Limit gobblin.kafka.minContainersForTopic config to the number of partitions of the topic
---
 .../kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java        | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
index f01b648..3b93ab2 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -282,7 +282,8 @@ public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
    * @return the minimum workunit size.
    */
   private Double getMinWorkUnitSize(WorkUnit workUnit) {
-    int minContainersForTopic = workUnit.getPropAsInt(MIN_CONTAINERS_FOR_TOPIC, -1);
+    int minContainersForTopic = Math.min(workUnit.getPropAsInt(MIN_CONTAINERS_FOR_TOPIC, -1),
+        workUnit.getPropAsInt(KafkaSource.NUM_TOPIC_PARTITIONS));
     if (minContainersForTopic == -1) {
       //No minimum configured? Return lower bound for workunit size to be 0.
       return 0.0;