You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/05/25 00:21:48 UTC
[gobblin] branch master updated: [GOBBLIN-1449] Correctly parse
topic name from topic partition string if topic name has hyphens
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2488c3f [GOBBLIN-1449] Correctly parse topic name from topic partition string if topic name has hyphens
2488c3f is described below
commit 2488c3fa3e38e32e339162ac472f2e8ad035c6c4
Author: suvasude <su...@linkedin.biz>
AuthorDate: Mon May 24 17:21:42 2021 -0700
[GOBBLIN-1449] Correctly parse topic name from topic partition string if topic name has hyphens
Closes #3287 from sv2000/topicNameFix
---
.../source/extractor/extract/kafka/KafkaUtils.java | 15 +++++++++++
.../packer/KafkaTopicGroupingWorkUnitPacker.java | 11 +++++---
.../extractor/extract/kafka/KafkaUtilsTest.java | 30 ++++++++++++++++++++++
3 files changed, 52 insertions(+), 4 deletions(-)
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
index f5b52b7..4dc41ef 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
@@ -23,7 +23,9 @@ import org.apache.gobblin.configuration.WorkUnitState;
import java.util.List;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
@@ -35,6 +37,8 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class KafkaUtils {
+ private static final String TOPIC_PARTITION_DELIMITER = "-";
+
/**
* Get topic name from a {@link State} object. The {@link State} should contain property
* {@link KafkaSource#TOPIC_NAME}.
@@ -182,4 +186,15 @@ public class KafkaUtils {
return Long.parseLong(workUnitState.contains(key) ? workUnitState.getProp(key)
: workUnitState.getProp(KafkaUtils.getPartitionPropName(key, partitionId), "0"));
}
+
+ /**
+ * Get topic name from a topic partition
+ * @param topicPartition
+ */
+ public static String getTopicNameFromTopicPartition(String topicPartition) {
+ Preconditions.checkArgument(topicPartition.contains(TOPIC_PARTITION_DELIMITER));
+ List<String> parts = Splitter.on(TOPIC_PARTITION_DELIMITER).splitToList(topicPartition);
+ return Joiner.on(TOPIC_PARTITION_DELIMITER).join(parts.subList(0, parts.size() - 1));
+ }
+
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
index 17a7089..f01b648 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.Path;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
@@ -46,6 +45,7 @@ import org.apache.gobblin.source.extractor.extract.AbstractSource;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.io.GsonInterfaceAdapter;
@@ -70,7 +70,6 @@ import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.
public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
public static final String GOBBLIN_KAFKA_PREFIX = "gobblin.kafka.";
private static final int DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER = 10;
- private static final String TOPIC_PARTITION_DELIMITER = "-";
//A global configuration for container capacity. The container capacity refers to the peak rate (in MB/s) that a
//single JVM can consume from Kafka for a single topic and controls the number of partitions of a topic that will be
@@ -224,6 +223,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
return squeezedGroups;
}
+
/**
* TODO: This method should be moved into {@link KafkaSource}, which requires moving classes such
* as {@link KafkaStreamingExtractor.KafkaWatermark} to the open source. A side-effect of this method is to
@@ -243,7 +243,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
GSON.fromJson(state.getProp(topicPartition), KafkaStreamingExtractor.KafkaWatermark.class);
lastCommittedWatermarks.put(topicPartition, watermark);
if (this.isPerTopicContainerCapacityEnabled) {
- String topicName = topicPartition.split(TOPIC_PARTITION_DELIMITER)[0];
+ String topicName = KafkaUtils.getTopicNameFromTopicPartition(topicPartition);
List<Double> capacities = capacitiesByTopic.getOrDefault(topicName, Lists.newArrayList());
capacities.add(watermark.getAvgConsumeRate() > 0 ? watermark.getAvgConsumeRate() : DEFAULT_CONTAINER_CAPACITY);
capacitiesByTopic.put(topicName, capacities);
@@ -338,7 +338,10 @@ public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
*/
@VisibleForTesting
static double getContainerCapacityForTopic(List<Double> capacities, ContainerCapacityComputationStrategy strategy) {
- Preconditions.checkArgument(capacities.size() > 0, "Capacities size must be > 0");
+ //No previous stats for a topic? Return default.
+ if (capacities == null) {
+ return DEFAULT_CONTAINER_CAPACITY;
+ }
Collections.sort(capacities);
log.info("Capacity computation strategy: {}, capacities: {}", strategy.name(), capacities);
switch (strategy) {
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtilsTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtilsTest.java
new file mode 100644
index 0000000..3683b8b
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtilsTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class KafkaUtilsTest {
+
+ @Test
+ public void testGetTopicNameFromTopicPartition() {
+ Assert.assertEquals(KafkaUtils.getTopicNameFromTopicPartition("topic-1"), "topic");
+ Assert.assertEquals(KafkaUtils.getTopicNameFromTopicPartition("topic-foo-bar-1"), "topic-foo-bar");
+ }
+}
\ No newline at end of file