You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/06/25 16:37:58 UTC

[hudi] branch master updated: [HUDI-2060] Added tests for KafkaOffsetGen (#3136)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ed1a5da  [HUDI-2060] Added tests for KafkaOffsetGen (#3136)
ed1a5da is described below

commit ed1a5daa9a15e9123aa7fdba5ce8262d1cae0704
Author: Vinay Patil <52...@users.noreply.github.com>
AuthorDate: Fri Jun 25 22:07:47 2021 +0530

    [HUDI-2060] Added tests for KafkaOffsetGen (#3136)
---
 .../sources/helpers/TestKafkaOffsetGen.java        | 147 +++++++++++++++++++++
 1 file changed, 147 insertions(+)

diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
new file mode 100644
index 0000000..508c90a
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
+import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.spark.streaming.kafka010.KafkaTestUtils;
+import org.apache.spark.streaming.kafka010.OffsetRange;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests against {@link KafkaOffsetGen}.
+ */
+public class TestKafkaOffsetGen {
+
+  private static String TEST_TOPIC_NAME = "hoodie_test";
+  private KafkaTestUtils testUtils;
+  private HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class);
+
+  @BeforeEach
+  public void setup() throws Exception {
+    testUtils = new KafkaTestUtils();
+    testUtils.setup();
+  }
+
+  @AfterEach
+  public void teardown() throws Exception {
+    testUtils.teardown();
+  }
+
+  private TypedProperties getConsumerConfigs(String autoOffsetReset) {
+    TypedProperties props = new TypedProperties();
+    props.put(Config.KAFKA_AUTO_OFFSET_RESET, autoOffsetReset);
+    props.put("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
+    props.setProperty("bootstrap.servers", testUtils.brokerAddress());
+    props.setProperty("key.deserializer", StringDeserializer.class.getName());
+    props.setProperty("value.deserializer", StringDeserializer.class.getName());
+    return props;
+  }
+
+  @Test
+  public void testGetNextOffsetRangesFromEarliest() {
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    testUtils.createTopic(TEST_TOPIC_NAME, 1);
+    testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+
+    KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest"));
+    OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics);
+    assertEquals(1, nextOffsetRanges.length);
+    assertEquals(0, nextOffsetRanges[0].fromOffset());
+    assertEquals(500, nextOffsetRanges[0].untilOffset());
+
+    nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 5000, metrics);
+    assertEquals(1, nextOffsetRanges.length);
+    assertEquals(0, nextOffsetRanges[0].fromOffset());
+    assertEquals(1000, nextOffsetRanges[0].untilOffset());
+  }
+
+  @Test
+  public void testGetNextOffsetRangesFromLatest() {
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    testUtils.createTopic(TEST_TOPIC_NAME, 1);
+    testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+    KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("latest"));
+    OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics);
+    assertEquals(1, nextOffsetRanges.length);
+    assertEquals(1000, nextOffsetRanges[0].fromOffset());
+    assertEquals(1000, nextOffsetRanges[0].untilOffset());
+  }
+
+  @Test
+  public void testGetNextOffsetRangesFromCheckpoint() {
+    String lastCheckpointString = TEST_TOPIC_NAME + ",0:250";
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    testUtils.createTopic(TEST_TOPIC_NAME, 1);
+    testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+    KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("latest"));
+
+    OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.of(lastCheckpointString), 500, metrics);
+    assertEquals(1, nextOffsetRanges.length);
+    assertEquals(250, nextOffsetRanges[0].fromOffset());
+    assertEquals(750, nextOffsetRanges[0].untilOffset());
+  }
+
+  @Test
+  public void testGetNextOffsetRangesFromMultiplePartitions() {
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    testUtils.createTopic(TEST_TOPIC_NAME, 2);
+    testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+    KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest"));
+    OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 499, metrics);
+    assertEquals(2, nextOffsetRanges.length);
+    assertEquals(0, nextOffsetRanges[0].fromOffset());
+    assertEquals(250, nextOffsetRanges[0].untilOffset());
+    assertEquals(0, nextOffsetRanges[1].fromOffset());
+    assertEquals(249, nextOffsetRanges[1].untilOffset());
+  }
+
+  @Test
+  public void testCheckTopicExists() {
+    TypedProperties props = getConsumerConfigs("latest");
+    KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
+    testUtils.createTopic(TEST_TOPIC_NAME, 1);
+    boolean topicExists = kafkaOffsetGen.checkTopicExists(new KafkaConsumer(props));
+    assertTrue(topicExists);
+    props.put("hoodie.deltastreamer.source.kafka.topic", "random");
+    kafkaOffsetGen = new KafkaOffsetGen(props);
+    topicExists = kafkaOffsetGen.checkTopicExists(new KafkaConsumer(props));
+    assertFalse(topicExists);
+  }
+
+  @Test
+  public void testTopicNameNotPresentInProps() {
+    assertThrows(HoodieNotSupportedException.class, () -> new KafkaOffsetGen(new TypedProperties()));
+  }
+}