You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2022/06/01 21:39:52 UTC
[gobblin] branch master updated: [GOBBLIN-1654] Add capacity floor to avoid aggressively requesting resource and small files. (#3515)
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e814aafad [GOBBLIN-1654] Add capacity floor to avoid aggressively requesting resource and small files. (#3515)
e814aafad is described below
commit e814aafad32f81948e6d49f39a50e5a2ae117884
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Wed Jun 1 14:39:47 2022 -0700
[GOBBLIN-1654] Add capacity floor to avoid aggressively requesting resource and small files. (#3515)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1654] Add capacity floor to avoid aggressively requesting resource and small files.
* address comments
Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
---
.../packer/KafkaTopicGroupingWorkUnitPacker.java | 23 +++++++++++++++++++++-
1 file changed, 22 insertions(+), 1 deletion(-)
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
index 3b93ab216..62cb18447 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.hadoop.fs.Path;
import com.google.common.annotations.VisibleForTesting;
@@ -79,6 +80,13 @@ public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
public static final String CONTAINER_CAPACITY_KEY = GOBBLIN_KAFKA_PREFIX + "streaming.containerCapacity";
public static final double DEFAULT_CONTAINER_CAPACITY = 10;
+ // minimum container capacity to avoid bad topic schema causing us to request resources aggressively
+ public static final String MINIMUM_CONTAINER_CAPACITY = GOBBLIN_KAFKA_PREFIX + "streaming.minimum.containerCapacity";
+ public static final double DEFAULT_MINIMUM_CONTAINER_CAPACITY = 1;
+ public static final String TOPIC_PARTITION_WITH_LOW_CAPACITY_EVENT_NAME = "topicPartitionWithLowCapacity";
+ public static final String TOPIC_PARTITION = "topicPartition";
+ public static final String TOPIC_PARTITION_CAPACITY = "topicPartitionCapacity";
+
//A boolean flag to enable per-topic container capacity, where "container capacity" is as defined earlier. This
// configuration is useful in scenarios where the write performance can vary significantly across topics due to differences
// in schema, as in the case of columnar formats such as ORC and Parquet. When enabled, the bin packing algorithm uses
@@ -125,6 +133,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
private static final String NUM_CONTAINERS_EVENT_NAME = "NumContainers";
private final long packingStartTimeMillis;
+ private final double minimumContainerCapacity;
private final Optional<StateStoreBasedWatermarkStorage> watermarkStorage;
private final Optional<MetricContext> metricContext;
private final boolean isStatsBasedPackingEnabled;
@@ -139,6 +148,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
Optional<MetricContext> metricContext) {
super(source, state);
this.state = state;
+ this.minimumContainerCapacity = state.getPropAsDouble(MINIMUM_CONTAINER_CAPACITY, DEFAULT_MINIMUM_CONTAINER_CAPACITY);
this.isStatsBasedPackingEnabled =
state.getPropAsBoolean(IS_STATS_BASED_PACKING_ENABLED_KEY, DEFAULT_IS_STATS_BASED_PACKING_ENABLED);
this.isPerTopicContainerCapacityEnabled = state
@@ -245,7 +255,18 @@ public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
if (this.isPerTopicContainerCapacityEnabled) {
String topicName = KafkaUtils.getTopicNameFromTopicPartition(topicPartition);
List<Double> capacities = capacitiesByTopic.getOrDefault(topicName, Lists.newArrayList());
- capacities.add(watermark.getAvgConsumeRate() > 0 ? watermark.getAvgConsumeRate() : DEFAULT_CONTAINER_CAPACITY);
+ double realCapacity = watermark.getAvgConsumeRate() > 0 ? watermark.getAvgConsumeRate() : DEFAULT_CONTAINER_CAPACITY;
+ if (realCapacity < minimumContainerCapacity) {
+ if (this.metricContext.isPresent()) {
+ GobblinEventBuilder event = new GobblinEventBuilder(TOPIC_PARTITION_WITH_LOW_CAPACITY_EVENT_NAME);
+ event.addMetadata(TOPIC_PARTITION, topicPartition);
+ event.addMetadata(TOPIC_PARTITION_CAPACITY, String.valueOf(realCapacity));
+ this.eventSubmitter.submit(event);
+ }
+ log.warn(String.format("topicPartition %s has lower capacity %s, ignore that and reset capacity to be %s", topicPartition, realCapacity, minimumContainerCapacity));
+ realCapacity = minimumContainerCapacity;
+ }
+ capacities.add(realCapacity);
capacitiesByTopic.put(topicName, capacities);
}
}