You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "nsivabalan (via GitHub)" <gi...@apache.org> on 2023/02/21 22:31:58 UTC

[GitHub] [hudi] nsivabalan commented on a diff in pull request #8011: [HUDI-5808] Add Support for kaffka ofsets in jsonkafkasource and avrokafkasource

nsivabalan commented on code in PR #8011:
URL: https://github.com/apache/hudi/pull/8011#discussion_r1113628425


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java:
##########
@@ -507,6 +508,13 @@ public static SchemaProviderWithPostProcessor wrapSchemaProviderWithPostProcesso
         Option.ofNullable(createSchemaPostProcessor(schemaPostProcessorClass, cfg, jssc)));
   }
 
+  public static SchemaProvider schemaProviderWithKafkaOffsetPostProcessorIfNeeded(SchemaProvider provider, TypedProperties cfg, JavaSparkContext jssc) {

Review Comment:
   getSchemaProviderForKafkaSource should suffice. lets add java docs to clarify what this method is doing. Its too lengthy to add KafkaPostProcessor in the naming



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.apache.avro.Schema;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Used internally to add Kafka offsets. You should probably not set
+ * hoodie.deltastreamer.schemaprovider.schema_post_processor to this class
+ * */
+public class KafkaOffsetPostProcessor extends SchemaPostProcessor {
+
+  public static class Config {
+    public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty

Review Comment:
   don't you need to add config annotation ? 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java:
##########
@@ -50,14 +53,18 @@ public class AvroKafkaSource extends KafkaSource<GenericRecord> {
   public static final String KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX = "hoodie.deltastreamer.source.kafka.value.deserializer.";
   public static final String KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA = KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX + "schema";
   private final String deserializerClassName;
+  protected final SchemaProvider schemaProviderWithoutOffsets;

Review Comment:
   lets name this originalSchemaProvider 
   and add java docs calling out that this may not have the Kafka offsets



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
+import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.streaming.kafka010.KafkaTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness {
+  protected static final String TEST_TOPIC_PREFIX = "hoodie_avro_test_";
+
+  protected static KafkaTestUtils testUtils;
+
+  protected static HoodieTestDataGenerator dataGen;
+
+  protected static String SCHEMA_PATH = "/tmp/schema_file.avsc";
+
+  protected final HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class);
+
+  protected SchemaProvider schemaProvider;
+
+  @BeforeAll
+  public static void initClass() {
+    testUtils = new KafkaTestUtils();
+    dataGen = new HoodieTestDataGenerator(0xDEED);
+    testUtils.setup();
+  }
+
+  @AfterAll
+  public static void cleanupClass() {
+    testUtils.teardown();
+  }
+
+  protected TypedProperties createPropsForKafkaSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) {
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.deltastreamer.source.kafka.topic", topic);
+    props.setProperty("bootstrap.servers", testUtils.brokerAddress());
+    props.setProperty("auto.offset.reset", resetStrategy);
+    props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+    props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
+        maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
+            String.valueOf(KafkaOffsetGen.Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue()));
+    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+    return props;
+  }
+
+  void sendMessagesToKafka(String topic, int count, int numPartitions) {
+    List<GenericRecord> genericRecords = dataGen.generateGenericRecords(count);
+    Properties config = getProducerProperties();
+    try (Producer<String, byte[]> producer = new KafkaProducer<>(config)) {
+      for (int i = 0; i < genericRecords.size(); i++) {
+        // use consistent keys to get even spread over partitions for test expectations
+        producer.send(new ProducerRecord<>(topic, Integer.toString(i % numPartitions), HoodieAvroUtils.avroToBytes(genericRecords.get(i))));
+      }
+    }
+  }
+
+  private Properties getProducerProperties() {
+    Properties props = new Properties();
+    props.put("bootstrap.servers", testUtils.brokerAddress());
+    props.put("value.serializer", ByteArraySerializer.class.getName());
+    props.put("value.deserializer", ByteArraySerializer.class.getName());
+    // Key serializer is required.
+    props.put("key.serializer", StringSerializer.class.getName());
+    props.put("auto.register.schemas", "false");
+    // wait for all in-sync replicas to ack sends
+    props.put("acks", "all");
+    return props;
+  }
+
+  @Test
+  public void testAppendKafkaOffsets() throws IOException {
+    UtilitiesTestBase.Helpers.saveStringsToDFS(new String[] {dataGen.generateGenericRecord().getSchema().toString()}, fs(), SCHEMA_PATH);
+    ConsumerRecord<Object, Object> recordConsumerRecord = new ConsumerRecord<Object,Object>("test", 0, 1L,
+        "test", dataGen.generateGenericRecord());
+    JavaRDD<ConsumerRecord<Object, Object>> rdd = jsc().parallelize(Arrays.asList(recordConsumerRecord));
+    TypedProperties props = new TypedProperties();
+    props.put("hoodie.deltastreamer.source.kafka.topic", "test");
+    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", SCHEMA_PATH);
+    SchemaProvider schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
+        UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>());
+
+    AvroKafkaSource avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, null);
+    GenericRecord withoutKafkaOffsets = avroKafkaSource.maybeAppendKafkaOffsets(rdd).collect().get(0);
+
+    props.put(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), "true");
+    schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
+        UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>());
+    avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, null);
+    GenericRecord withKafkaOffsets = avroKafkaSource.maybeAppendKafkaOffsets(rdd).collect().get(0);
+    assertEquals(3,withKafkaOffsets.getSchema().getFields().size() - withoutKafkaOffsets.getSchema().getFields().size());
+  }
+
+  @Test
+  public void testAppendKafkaOffsetsSourceFormatAdapter() throws IOException {
+    UtilitiesTestBase.Helpers.saveStringsToDFS(new String[] {dataGen.generateGenericRecord().getSchema().toString()}, fs(), SCHEMA_PATH);
+    final String topic = TEST_TOPIC_PREFIX + "testKafkaOffsetAppend";
+    TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+
+    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", SCHEMA_PATH);
+    SchemaProvider schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
+        UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>());
+
+    props.put("hoodie.deltastreamer.source.kafka.value.deserializer.class", ByteArrayDeserializer.class.getName());
+    sendMessagesToKafka(topic, 1, 2);
+    AvroKafkaSource avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
+    SourceFormatAdapter kafkaSource = new SourceFormatAdapter(avroKafkaSource);
+    List<String> columns = Arrays.stream(kafkaSource.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE)
+        .getBatch().get().columns()).collect(Collectors.toList());
+    props.put(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), "true");
+
+    schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
+        UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>());
+    avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
+    kafkaSource = new SourceFormatAdapter(avroKafkaSource);
+    List<String> withKafkaOffsetColumns = Arrays.stream(kafkaSource.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE)
+        .getBatch().get().columns()).collect(Collectors.toList());
+
+    assertEquals(3, withKafkaOffsetColumns.size() - columns.size());
+    List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN);
+    assertEquals(appendList, withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 3, withKafkaOffsetColumns.size()));

Review Comment:
   can we assert for partition name and topic name as well



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java:
##########
@@ -175,4 +185,29 @@ void sendMessagesToKafka(String topic, int count, int numPartitions) {
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", count)));
   }
+
+  @Test
+  public void testAppendKafkaOffset() {
+    final String topic = TEST_TOPIC_PREFIX + "testKafkaOffsetAppend";
+    testUtils.createTopic(topic, 2);
+    sendMessagesToKafka(topic, 10, 2);
+
+    TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+    Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
+    SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
+    List<String> columns = Arrays.stream(kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE)
+        .getBatch().get().columns()).collect(Collectors.toList());
+
+    props.put(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), "true");
+    SchemaProvider postSchemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
+        schemaProvider, props, jsc(), new ArrayList<>());
+    jsonSource = new JsonKafkaSource(props, jsc(), spark(), postSchemaProvider, metrics);
+    kafkaSource = new SourceFormatAdapter(jsonSource);
+    List<String> withKafkaOffsetColumns = Arrays.stream(kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE)
+        .getBatch().get().columns()).collect(Collectors.toList());
+
+    assertEquals(3, withKafkaOffsetColumns.size() - columns.size());
+    List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN);
+    assertEquals(appendList, withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 3, withKafkaOffsetColumns.size()));

Review Comment:
   can we assert for partition name and topic name as well



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java:
##########
@@ -62,6 +63,9 @@ public ProtoKafkaSource(TypedProperties props, JavaSparkContext sparkContext,
 
   @Override
   JavaRDD<Message> toRDD(OffsetRange[] offsetRanges) {
+    if (KafkaOffsetPostProcessor.Config.shouldAddOffsets(props)) {

Review Comment:
   probably we can do it once in the constructor. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java:
##########
@@ -101,4 +107,18 @@ public GenericRecord fromProtoMessage(Message message) {
     initSchema();
     return ProtoConversionUtil.convertToAvro(schema, message);
   }
+
+  public GenericRecord withKafkaFieldsAppended(ConsumerRecord consumerRecord) {
+    initSchema();
+    GenericRecord record = (GenericRecord) consumerRecord.value();
+    GenericRecordBuilder recordBuilder = new GenericRecordBuilder(this.schema);

Review Comment:
   this.schema may not have the offset right? 
   so, how are we adding additional fields in L 118, 119, 129 when the GenericRecord schema does not have these fields? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org