You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2022/06/01 21:39:52 UTC

[gobblin] branch master updated: [GOBBLIN-1654] Add capacity floor to avoid aggressively requesting resource and small files. (#3515)

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new e814aafad [GOBBLIN-1654] Add capacity floor to avoid aggressively requesting resource and small files. (#3515)
e814aafad is described below

commit e814aafad32f81948e6d49f39a50e5a2ae117884
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Wed Jun 1 14:39:47 2022 -0700

    [GOBBLIN-1654] Add capacity floor to avoid aggressively requesting resource and small files. (#3515)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1654] Add capacity floor to avoid aggressively requesting resource and small files.
    
    * address comments
    
    Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
---
 .../packer/KafkaTopicGroupingWorkUnitPacker.java   | 23 +++++++++++++++++++++-
 1 file changed, 22 insertions(+), 1 deletion(-)

diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
index 3b93ab216..62cb18447 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -79,6 +80,13 @@ public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
   public static final String CONTAINER_CAPACITY_KEY = GOBBLIN_KAFKA_PREFIX + "streaming.containerCapacity";
   public static final double DEFAULT_CONTAINER_CAPACITY = 10;
 
+  // minimum container capacity to avoid bad topic schema causing us to request resources aggressively
+  public static final String MINIMUM_CONTAINER_CAPACITY = GOBBLIN_KAFKA_PREFIX + "streaming.minimum.containerCapacity";
+  public static final double DEFAULT_MINIMUM_CONTAINER_CAPACITY = 1;
+  public static final String TOPIC_PARTITION_WITH_LOW_CAPACITY_EVENT_NAME = "topicPartitionWithLowCapacity";
+  public static final String TOPIC_PARTITION = "topicPartition";
+  public static final String TOPIC_PARTITION_CAPACITY = "topicPartitionCapacity";
+
   //A boolean flag to enable per-topic container capacity, where "container capacity" is as defined earlier. This
   // configuration is useful in scenarios where the write performance can vary significantly across topics due to differences
   // in schema, as in the case of columnar formats such as ORC and Parquet. When enabled, the bin packing algorithm uses
@@ -125,6 +133,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
   private static final String NUM_CONTAINERS_EVENT_NAME = "NumContainers";
 
   private final long packingStartTimeMillis;
+  private final double minimumContainerCapacity;
   private final Optional<StateStoreBasedWatermarkStorage> watermarkStorage;
   private final Optional<MetricContext> metricContext;
   private final boolean isStatsBasedPackingEnabled;
@@ -139,6 +148,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
       Optional<MetricContext> metricContext) {
     super(source, state);
     this.state = state;
+    this.minimumContainerCapacity = state.getPropAsDouble(MINIMUM_CONTAINER_CAPACITY, DEFAULT_MINIMUM_CONTAINER_CAPACITY);
     this.isStatsBasedPackingEnabled =
         state.getPropAsBoolean(IS_STATS_BASED_PACKING_ENABLED_KEY, DEFAULT_IS_STATS_BASED_PACKING_ENABLED);
     this.isPerTopicContainerCapacityEnabled = state
@@ -245,7 +255,18 @@ public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
       if (this.isPerTopicContainerCapacityEnabled) {
         String topicName = KafkaUtils.getTopicNameFromTopicPartition(topicPartition);
         List<Double> capacities = capacitiesByTopic.getOrDefault(topicName, Lists.newArrayList());
-        capacities.add(watermark.getAvgConsumeRate() > 0 ? watermark.getAvgConsumeRate() : DEFAULT_CONTAINER_CAPACITY);
+        double realCapacity = watermark.getAvgConsumeRate() > 0 ? watermark.getAvgConsumeRate() : DEFAULT_CONTAINER_CAPACITY;
+        if (realCapacity < minimumContainerCapacity) {
+          if (this.metricContext.isPresent()) {
+            GobblinEventBuilder event = new GobblinEventBuilder(TOPIC_PARTITION_WITH_LOW_CAPACITY_EVENT_NAME);
+            event.addMetadata(TOPIC_PARTITION, topicPartition);
+            event.addMetadata(TOPIC_PARTITION_CAPACITY, String.valueOf(realCapacity));
+            this.eventSubmitter.submit(event);
+          }
+          log.warn(String.format("topicPartition %s has lower capacity %s, ignore that and reset capacity to be %s", topicPartition, realCapacity, minimumContainerCapacity));
+          realCapacity = minimumContainerCapacity;
+        }
+        capacities.add(realCapacity);
         capacitiesByTopic.put(topicName, capacities);
       }
     }