You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/05/25 00:21:48 UTC

[gobblin] branch master updated: [GOBBLIN-1449] Correctly parse topic name from topic partition string if topic name has hyphens

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2488c3f  [GOBBLIN-1449] Correctly parse topic name from topic partition string if topic name has hyphens
2488c3f is described below

commit 2488c3fa3e38e32e339162ac472f2e8ad035c6c4
Author: suvasude <su...@linkedin.biz>
AuthorDate: Mon May 24 17:21:42 2021 -0700

    [GOBBLIN-1449] Correctly parse topic name from topic partition string if topic name has hyphens
    
    Closes #3287 from sv2000/topicNameFix
---
 .../source/extractor/extract/kafka/KafkaUtils.java | 15 +++++++++++
 .../packer/KafkaTopicGroupingWorkUnitPacker.java   | 11 +++++---
 .../extractor/extract/kafka/KafkaUtilsTest.java    | 30 ++++++++++++++++++++++
 3 files changed, 52 insertions(+), 4 deletions(-)

diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
index f5b52b7..4dc41ef 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
@@ -23,7 +23,9 @@ import org.apache.gobblin.configuration.WorkUnitState;
 
 import java.util.List;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 
 import lombok.extern.slf4j.Slf4j;
@@ -35,6 +37,8 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class KafkaUtils {
 
+  private static final String TOPIC_PARTITION_DELIMITER = "-";
+
   /**
    * Get topic name from a {@link State} object. The {@link State} should contain property
    * {@link KafkaSource#TOPIC_NAME}.
@@ -182,4 +186,15 @@ public class KafkaUtils {
     return Long.parseLong(workUnitState.contains(key) ? workUnitState.getProp(key)
         : workUnitState.getProp(KafkaUtils.getPartitionPropName(key, partitionId), "0"));
   }
+
+  /**
+   * Get topic name from a topic partition
+   * @param topicPartition
+   */
+  public static String getTopicNameFromTopicPartition(String topicPartition) {
+    Preconditions.checkArgument(topicPartition.contains(TOPIC_PARTITION_DELIMITER));
+    List<String> parts = Splitter.on(TOPIC_PARTITION_DELIMITER).splitToList(topicPartition);
+    return Joiner.on(TOPIC_PARTITION_DELIMITER).join(parts.subList(0, parts.size() - 1));
+  }
+
 }
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
index 17a7089..f01b648 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.Path;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
@@ -46,6 +45,7 @@ import org.apache.gobblin.source.extractor.extract.AbstractSource;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.io.GsonInterfaceAdapter;
@@ -70,7 +70,6 @@ import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.
 public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
   public static final String GOBBLIN_KAFKA_PREFIX = "gobblin.kafka.";
   private static final int DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER = 10;
-  private static final String TOPIC_PARTITION_DELIMITER = "-";
 
   //A global configuration for container capacity. The container capacity refers to the peak rate (in MB/s) that a
   //single JVM can consume from Kafka for a single topic and controls the number of partitions of a topic that will be
@@ -224,6 +223,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
     return squeezedGroups;
   }
 
+
   /**
    * TODO: This method should be moved into {@link KafkaSource}, which requires moving classes such
    * as {@link KafkaStreamingExtractor.KafkaWatermark} to the open source. A side-effect of this method is to
@@ -243,7 +243,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
           GSON.fromJson(state.getProp(topicPartition), KafkaStreamingExtractor.KafkaWatermark.class);
       lastCommittedWatermarks.put(topicPartition, watermark);
       if (this.isPerTopicContainerCapacityEnabled) {
-        String topicName = topicPartition.split(TOPIC_PARTITION_DELIMITER)[0];
+        String topicName = KafkaUtils.getTopicNameFromTopicPartition(topicPartition);
         List<Double> capacities = capacitiesByTopic.getOrDefault(topicName, Lists.newArrayList());
         capacities.add(watermark.getAvgConsumeRate() > 0 ? watermark.getAvgConsumeRate() : DEFAULT_CONTAINER_CAPACITY);
         capacitiesByTopic.put(topicName, capacities);
@@ -338,7 +338,10 @@ public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
    */
   @VisibleForTesting
   static double getContainerCapacityForTopic(List<Double> capacities, ContainerCapacityComputationStrategy strategy) {
-    Preconditions.checkArgument(capacities.size() > 0, "Capacities size must be > 0");
+    //No previous stats for a topic? Return default.
+    if (capacities == null) {
+      return DEFAULT_CONTAINER_CAPACITY;
+    }
     Collections.sort(capacities);
     log.info("Capacity computation strategy: {}, capacities: {}", strategy.name(), capacities);
     switch (strategy) {
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtilsTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtilsTest.java
new file mode 100644
index 0000000..3683b8b
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtilsTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class KafkaUtilsTest {
+
+  @Test
+  public void testGetTopicNameFromTopicPartition() {
+    Assert.assertEquals(KafkaUtils.getTopicNameFromTopicPartition("topic-1"), "topic");
+    Assert.assertEquals(KafkaUtils.getTopicNameFromTopicPartition("topic-foo-bar-1"), "topic-foo-bar");
+  }
+}
\ No newline at end of file