You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2023/03/01 02:32:18 UTC

[hudi] branch master updated: [HUDI-5808] Add Support for kaffka ofsets in jsonkafkasource and avrokafkasource (#8011)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a72ca04eeea [HUDI-5808] Add Support for kaffka ofsets in jsonkafkasource and avrokafkasource (#8011)
a72ca04eeea is described below

commit a72ca04eeea213cf09d649763f097327ee67827f
Author: Jon Vexler <jb...@gmail.com>
AuthorDate: Tue Feb 28 21:32:09 2023 -0500

    [HUDI-5808] Add Support for kaffka ofsets in jsonkafkasource and avrokafkasource (#8011)
    
    Add Support for kaffka ofsets in jsonkafkasource and avrokafkasource.
    
    - New config hoodie.deltastreamer.source.kafka.append.offsets. Default is "false".
    - Set to "true" with JsonKafkaSource or AvroKafkaSource as your source to use.
    
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
    Co-authored-by: sivabalan <n....@gmail.com>
---
 .../org/apache/hudi/utilities/UtilHelpers.java     |  16 +-
 .../utilities/schema/KafkaOffsetPostProcessor.java |  73 ++++++++
 .../hudi/utilities/sources/AvroKafkaSource.java    |  37 +++-
 .../hudi/utilities/sources/JsonKafkaSource.java    |  43 ++++-
 .../apache/hudi/utilities/sources/KafkaSource.java |   5 +-
 .../hudi/utilities/sources/ProtoKafkaSource.java   |   3 +
 .../utilities/sources/helpers/AvroConvertor.java   |  23 +++
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  57 ++++++-
 .../utilities/sources/TestAvroKafkaSource.java     | 188 +++++++++++++++++++++
 .../utilities/sources/TestJsonKafkaSource.java     |  59 +++++--
 .../sources/TestJsonKafkaSourcePostProcessor.java  |   8 +-
 11 files changed, 478 insertions(+), 34 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index e720ea7817d..bcd889df787 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -54,6 +54,7 @@ import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
 import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
 import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
 import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
+import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
 import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaPostProcessor;
 import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config;
@@ -484,7 +485,7 @@ public class UtilHelpers {
     return originalProvider;
   }
 
-  public static SchemaProviderWithPostProcessor wrapSchemaProviderWithPostProcessor(SchemaProvider provider,
+  public static SchemaProvider wrapSchemaProviderWithPostProcessor(SchemaProvider provider,
                                                                                     TypedProperties cfg, JavaSparkContext jssc, List<String> transformerClassNames) {
 
     if (provider == null) {
@@ -492,7 +493,7 @@ public class UtilHelpers {
     }
 
     if (provider instanceof SchemaProviderWithPostProcessor) {
-      return (SchemaProviderWithPostProcessor) provider;
+      return provider;
     }
 
     String schemaPostProcessorClass = cfg.getString(Config.SCHEMA_POST_PROCESSOR_PROP, null);
@@ -503,10 +504,21 @@ public class UtilHelpers {
       schemaPostProcessorClass = SparkAvroPostProcessor.class.getName();
     }
 
+    if (schemaPostProcessorClass == null || schemaPostProcessorClass.isEmpty()) {
+      return provider;
+    }
+
     return new SchemaProviderWithPostProcessor(provider,
         Option.ofNullable(createSchemaPostProcessor(schemaPostProcessorClass, cfg, jssc)));
   }
 
+  public static SchemaProvider getSchemaProviderForKafkaSource(SchemaProvider provider, TypedProperties cfg, JavaSparkContext jssc) {
+    if (KafkaOffsetPostProcessor.Config.shouldAddOffsets(cfg)) {
+      return new SchemaProviderWithPostProcessor(provider, Option.ofNullable(new KafkaOffsetPostProcessor(cfg, jssc)));
+    }
+    return provider;
+  }
+
   public static SchemaProvider createRowBasedSchemaProvider(StructType structType, TypedProperties cfg, JavaSparkContext jssc) {
     SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
     return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc, null);
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
new file mode 100644
index 00000000000..cc0ce2cf35a
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.apache.avro.Schema;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Used internally to add Kafka offsets. You should probably not set
+ * hoodie.deltastreamer.schemaprovider.schema_post_processor to this class
+ * */
+public class KafkaOffsetPostProcessor extends SchemaPostProcessor {
+
+  public static class Config {
+    public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+        .key("hoodie.deltastreamer.source.kafka.append.offsets")
+        .defaultValue("false")
+        .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+            + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+            + "By default its disabled and no kafka offsets are added");
+
+    public static boolean shouldAddOffsets(TypedProperties props) {
+      return  props.getBoolean(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), Boolean.parseBoolean(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.defaultValue()));
+    }
+  }
+
+  private static final Logger LOG = LogManager.getLogger(KafkaOffsetPostProcessor.class);
+
+  public static final String KAFKA_SOURCE_OFFSET_COLUMN = "_hoodie_kafka_source_offset";
+  public static final String KAFKA_SOURCE_PARTITION_COLUMN = "_hoodie_kafka_source_partition";
+  public static final String KAFKA_SOURCE_TIMESTAMP_COLUMN = "_hoodie_kafka_source_timestamp";
+
+  public KafkaOffsetPostProcessor(TypedProperties props, JavaSparkContext jssc) {
+    super(props, jssc);
+  }
+
+  @Override
+  public Schema processSchema(Schema schema) {
+    // this method adds kafka offset fields namely source offset, partition and timestamp to the schema of the batch.
+    List<Schema.Field> fieldList = schema.getFields();
+    List<Schema.Field> newFieldList = fieldList.stream()
+        .map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal())).collect(Collectors.toList());
+    newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN, Schema.create(Schema.Type.LONG), "offset column", 0));
+    newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN, Schema.create(Schema.Type.INT), "partition column", 0));
+    newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN, Schema.create(Schema.Type.LONG), "timestamp column", 0));
+    Schema newSchema = Schema.createRecord(schema.getName() + "_processed", schema.getDoc(), schema.getNamespace(), false, newFieldList);
+    return newSchema;
+  }
+}
\ No newline at end of file
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index a20ecbdfbf0..3f7b8752b34 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -22,6 +22,7 @@ import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
 import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
 import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -29,6 +30,7 @@ import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 
 import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.log4j.LogManager;
@@ -51,13 +53,19 @@ public class AvroKafkaSource extends KafkaSource<GenericRecord> {
   public static final String KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA = KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX + "schema";
   private final String deserializerClassName;
 
+  //other schema provider may have kafka offsets
+  protected final SchemaProvider originalSchemaProvider;
+
   public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
-      SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
-    super(props, sparkContext, sparkSession, schemaProvider, SourceType.AVRO, metrics);
+                         SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
+    super(props, sparkContext, sparkSession,
+        UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider, props, sparkContext),
+        SourceType.AVRO, metrics);
+    this.originalSchemaProvider = schemaProvider;
 
     props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class.getName());
     deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
-            DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
+        DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
 
     try {
       props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName).getName());
@@ -77,16 +85,31 @@ public class AvroKafkaSource extends KafkaSource<GenericRecord> {
 
   @Override
   JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
+    JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD;
     if (deserializerClassName.equals(ByteArrayDeserializer.class.getName())) {
       if (schemaProvider == null) {
         throw new HoodieException("Please provide a valid schema provider class when use ByteArrayDeserializer!");
       }
+
+      //Don't want kafka offsets here so we use originalSchemaProvider
+      AvroConvertor convertor = new AvroConvertor(originalSchemaProvider.getSourceSchema());
+      kafkaRDD = KafkaUtils.<String, byte[]>createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
+          LocationStrategies.PreferConsistent()).map(obj ->
+          new ConsumerRecord<>(obj.topic(), obj.partition(), obj.offset(),
+              obj.key(), convertor.fromAvroBinary(obj.value())));
+    } else {
+      kafkaRDD = KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
+          LocationStrategies.PreferConsistent());
+    }
+    return maybeAppendKafkaOffsets(kafkaRDD);
+  }
+
+  protected JavaRDD<GenericRecord> maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD) {
+    if (this.shouldAddOffsets) {
       AvroConvertor convertor = new AvroConvertor(schemaProvider.getSourceSchema());
-      return KafkaUtils.<String, byte[]>createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
-              LocationStrategies.PreferConsistent()).map(obj -> convertor.fromAvroBinary(obj.value()));
+      return kafkaRDD.map(convertor::withKafkaFieldsAppended);
     } else {
-      return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
-              LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value());
+      return kafkaRDD.map(consumerRecord -> (GenericRecord) consumerRecord.value());
     }
   }
 }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index 26849d499e9..9e6d4ce4fb2 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -27,6 +27,9 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -36,6 +39,12 @@ import org.apache.spark.streaming.kafka010.LocationStrategies;
 import org.apache.spark.streaming.kafka010.OffsetRange;
 
 import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
 
 /**
  * Read json kafka data.
@@ -44,7 +53,9 @@ public class JsonKafkaSource extends KafkaSource<String> {
 
   public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession,
                          SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
-    super(properties, sparkContext, sparkSession, schemaProvider, SourceType.JSON, metrics);
+    super(properties, sparkContext, sparkSession,
+        UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider, properties, sparkContext),
+        SourceType.JSON, metrics);
     properties.put("key.deserializer", StringDeserializer.class.getName());
     properties.put("value.deserializer", StringDeserializer.class.getName());
     this.offsetGen = new KafkaOffsetGen(props);
@@ -52,13 +63,35 @@ public class JsonKafkaSource extends KafkaSource<String> {
 
   @Override
   JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
-    JavaRDD<String> jsonStringRDD = KafkaUtils.createRDD(sparkContext,
+    JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD = KafkaUtils.createRDD(sparkContext,
             offsetGen.getKafkaParams(),
             offsetRanges,
             LocationStrategies.PreferConsistent())
-        .filter(x -> !StringUtils.isNullOrEmpty((String) x.value()))
-        .map(x -> x.value().toString());
-    return postProcess(jsonStringRDD);
+        .filter(x -> !StringUtils.isNullOrEmpty((String) x.value()));
+    return postProcess(maybeAppendKafkaOffsets(kafkaRDD));
+  }
+
+  protected  JavaRDD<String> maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD) {
+    if (this.shouldAddOffsets) {
+      return kafkaRDD.mapPartitions(partitionIterator -> {
+        List<String> stringList = new LinkedList<>();
+        ObjectMapper om = new ObjectMapper();
+        partitionIterator.forEachRemaining(consumerRecord -> {
+          String record = consumerRecord.value().toString();
+          try {
+            ObjectNode jsonNode = (ObjectNode) om.readTree(record);
+            jsonNode.put(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
+            jsonNode.put(KAFKA_SOURCE_PARTITION_COLUMN, consumerRecord.partition());
+            jsonNode.put(KAFKA_SOURCE_TIMESTAMP_COLUMN, consumerRecord.timestamp());
+            stringList.add(om.writeValueAsString(jsonNode));
+          } catch (Throwable e) {
+            stringList.add(record);
+          }
+        });
+        return stringList.iterator();
+      });
+    }
+    return kafkaRDD.map(consumerRecord -> (String) consumerRecord.value());
   }
 
   private JavaRDD<String> postProcess(JavaRDD<String> jsonStringRDD) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
index 5561356cabc..0fb94a1919c 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
 import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
+import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 
@@ -42,12 +43,14 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> {
   protected final SchemaProvider schemaProvider;
   protected KafkaOffsetGen offsetGen;
 
+  protected final boolean shouldAddOffsets;
+
   protected KafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
                         SchemaProvider schemaProvider, SourceType sourceType, HoodieDeltaStreamerMetrics metrics) {
     super(props, sparkContext, sparkSession, schemaProvider, sourceType);
-
     this.schemaProvider = schemaProvider;
     this.metrics = metrics;
+    this.shouldAddOffsets = KafkaOffsetPostProcessor.Config.shouldAddOffsets(props);
   }
 
   @Override
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
index 1ed057e322d..7effb0aa543 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
@@ -58,6 +58,9 @@ public class ProtoKafkaSource extends KafkaSource<Message> {
     props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, ByteArrayDeserializer.class);
     className = props.getString(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key());
     this.offsetGen = new KafkaOffsetGen(props);
+    if (this.shouldAddOffsets) {
+      throw new HoodieException("Appending kafka offsets to ProtoKafkaSource is not supported");
+    }
   }
 
   @Override
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
index ee160e19d52..3906d0ce7cc 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
@@ -25,9 +25,15 @@ import com.twitter.bijection.Injection;
 import com.twitter.bijection.avro.GenericAvroCodecs;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 import java.io.Serializable;
 
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+
 import scala.util.Either;
 import scala.util.Left;
 import scala.util.Right;
@@ -115,4 +121,21 @@ public class AvroConvertor implements Serializable {
     initSchema();
     return ProtoConversionUtil.convertToAvro(schema, message);
   }
+
+  /**
+   * this.schema is required to have kafka offsets for this to work
+   */
+  public GenericRecord withKafkaFieldsAppended(ConsumerRecord consumerRecord) {
+    initSchema();
+    GenericRecord record = (GenericRecord) consumerRecord.value();
+    GenericRecordBuilder recordBuilder = new GenericRecordBuilder(this.schema);
+    for (Schema.Field field :  record.getSchema().getFields()) {
+      recordBuilder.set(field, record.get(field.name()));
+    }
+    recordBuilder.set(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
+    recordBuilder.set(KAFKA_SOURCE_PARTITION_COLUMN, consumerRecord.partition());
+    recordBuilder.set(KAFKA_SOURCE_TIMESTAMP_COLUMN, consumerRecord.timestamp());
+    return recordBuilder.build();
+  }
+
 }
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 61d03bc0b58..29d8c9bbe78 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -67,6 +67,7 @@ import org.apache.hudi.utilities.DummySchemaProvider;
 import org.apache.hudi.utilities.HoodieClusteringJob;
 import org.apache.hudi.utilities.HoodieIndexer;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
 import org.apache.hudi.utilities.sources.CsvDFSSource;
@@ -123,6 +124,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -148,6 +150,9 @@ import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
 import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
 import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
 import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -1667,9 +1672,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
   }
 
   private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) {
+    prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2);
+  }
+
+  private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName, int numPartitions) {
     if (createTopic) {
       try {
-        testUtils.createTopic(topicName, 2);
+        testUtils.createTopic(topicName, numPartitions);
       } catch (TopicExistsException e) {
         // no op
       }
@@ -1804,10 +1813,10 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
   }
 
   private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName) throws IOException {
-    prepareJsonKafkaDFSSource(propsFileName, autoResetValue, topicName, null);
+    prepareJsonKafkaDFSSource(propsFileName, autoResetValue, topicName, null, false);
   }
 
-  private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName, Map<String,String> extraProps) throws IOException {
+  private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName, Map<String,String> extraProps, boolean shouldAddOffsets) throws IOException {
     // Properties used for testing delta-streamer with JsonKafka source
     TypedProperties props = new TypedProperties();
     populateAllCommonProps(props, basePath, testUtils.brokerAddress());
@@ -1824,7 +1833,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     if (extraProps != null && !extraProps.isEmpty()) {
       extraProps.forEach(props::setProperty);
     }
-
+    props.setProperty(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), Boolean.toString(shouldAddOffsets));
     UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath + "/" + propsFileName);
   }
 
@@ -1895,6 +1904,44 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext);
   }
 
+  @Test
+  public void testJsonKafkaDFSSourceWithOffsets() throws Exception {
+    topicName = "topic" + testNum;
+    int numRecords = 15;
+    int numPartitions = 3;
+    int recsPerPartition = numRecords / numPartitions;
+    long beforeTime = Instant.now().toEpochMilli();
+    prepareJsonKafkaDFSFiles(numRecords, true, topicName, numPartitions);
+    prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName, null, true);
+    String tableBasePath = basePath + "/test_json_kafka_offsets_table" + testNum;
+    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+        TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
+            Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
+            true, 100000, false, null, null, "timestamp", null), jsc);
+    deltaStreamer.sync();
+    sqlContext.clearCache();
+    Dataset<Row> ds = sqlContext.read().format("org.apache.hudi").load(tableBasePath);
+    assertEquals(numRecords, ds.count());
+    //ensure that kafka partition column exists and is populated correctly
+    for (int i = 0; i < numPartitions; i++) {
+      assertEquals(recsPerPartition, ds.filter(KAFKA_SOURCE_PARTITION_COLUMN + "=" + i).count());
+    }
+
+    //ensure that kafka timestamp column exists and is populated correctly
+    long afterTime = Instant.now().toEpochMilli();
+    assertEquals(numRecords, ds.filter(KAFKA_SOURCE_TIMESTAMP_COLUMN + ">" + beforeTime).filter(KAFKA_SOURCE_TIMESTAMP_COLUMN + "<" + afterTime).count());
+
+
+    //ensure that kafka offset column exists and is populated correctly
+    sqlContext.read().format("org.apache.hudi").load(tableBasePath).col(KAFKA_SOURCE_OFFSET_COLUMN);
+    for (int i = 0; i < recsPerPartition; i++) {
+      for (int j = 0; j < numPartitions; j++) {
+        //each offset partition pair should be unique
+        assertEquals(1, ds.filter(KAFKA_SOURCE_OFFSET_COLUMN + "=" + i).filter(KAFKA_SOURCE_PARTITION_COLUMN + "=" + j).count());
+      }
+    }
+  }
+
   @Test
   public void testKafkaTimestampType() throws Exception {
     topicName = "topic" + testNum;
@@ -1948,7 +1995,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     prepareJsonKafkaDFSFiles(20, true, topicName);
     Map<String, String> kafkaExtraProps = new HashMap<>();
     kafkaExtraProps.put(MUTLI_WRITER_SOURCE_CHECKPOINT_ID.key(), "kafka");
-    prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName, kafkaExtraProps);
+    prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName, kafkaExtraProps, false);
     // delta streamer w/ json kafka source
     HoodieDeltaStreamer kafkaDs = new HoodieDeltaStreamer(
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
new file mode 100644
index 00000000000..cb81ad605ed
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
+import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.streaming.kafka010.KafkaTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness {
+  protected static final String TEST_TOPIC_PREFIX = "hoodie_avro_test_";
+
+  protected static KafkaTestUtils testUtils;
+
+  protected static HoodieTestDataGenerator dataGen;
+
+  protected static String SCHEMA_PATH = "/tmp/schema_file.avsc";
+
+  protected final HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class);
+
+  protected SchemaProvider schemaProvider;
+
+  @BeforeAll
+  public static void initClass() {
+    testUtils = new KafkaTestUtils();
+    dataGen = new HoodieTestDataGenerator(0xDEED);
+    testUtils.setup();
+  }
+
+  @AfterAll
+  public static void cleanupClass() {
+    testUtils.teardown();
+  }
+
+  protected TypedProperties createPropsForKafkaSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) {
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.deltastreamer.source.kafka.topic", topic);
+    props.setProperty("bootstrap.servers", testUtils.brokerAddress());
+    props.setProperty("auto.offset.reset", resetStrategy);
+    props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+    props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
+        maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
+            String.valueOf(KafkaOffsetGen.Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue()));
+    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+    return props;
+  }
+
+  void sendMessagesToKafka(String topic, int count, int numPartitions) {
+    List<GenericRecord> genericRecords = dataGen.generateGenericRecords(count);
+    Properties config = getProducerProperties();
+    try (Producer<String, byte[]> producer = new KafkaProducer<>(config)) {
+      for (int i = 0; i < genericRecords.size(); i++) {
+        // use consistent keys to get even spread over partitions for test expectations
+        producer.send(new ProducerRecord<>(topic, i % numPartitions, "key", HoodieAvroUtils.avroToBytes(genericRecords.get(i))));
+      }
+    }
+  }
+
+  private Properties getProducerProperties() {
+    Properties props = new Properties();
+    props.put("bootstrap.servers", testUtils.brokerAddress());
+    props.put("value.serializer", ByteArraySerializer.class.getName());
+    props.put("value.deserializer", ByteArraySerializer.class.getName());
+    // Key serializer is required.
+    props.put("key.serializer", StringSerializer.class.getName());
+    props.put("auto.register.schemas", "false");
+    // wait for all in-sync replicas to ack sends
+    props.put("acks", "all");
+    return props;
+  }
+
+  @Test
+  public void testAppendKafkaOffsets() throws IOException {
+    UtilitiesTestBase.Helpers.saveStringsToDFS(new String[] {dataGen.generateGenericRecord().getSchema().toString()}, fs(), SCHEMA_PATH);
+    ConsumerRecord<Object, Object> recordConsumerRecord = new ConsumerRecord<Object,Object>("test", 0, 1L,
+        "test", dataGen.generateGenericRecord());
+    JavaRDD<ConsumerRecord<Object, Object>> rdd = jsc().parallelize(Arrays.asList(recordConsumerRecord));
+    TypedProperties props = new TypedProperties();
+    props.put("hoodie.deltastreamer.source.kafka.topic", "test");
+    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", SCHEMA_PATH);
+    SchemaProvider schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
+        UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>());
+
+    AvroKafkaSource avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, null);
+    GenericRecord withoutKafkaOffsets = avroKafkaSource.maybeAppendKafkaOffsets(rdd).collect().get(0);
+
+    props.put(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), "true");
+    schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
+        UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>());
+    avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, null);
+    GenericRecord withKafkaOffsets = avroKafkaSource.maybeAppendKafkaOffsets(rdd).collect().get(0);
+    assertEquals(3,withKafkaOffsets.getSchema().getFields().size() - withoutKafkaOffsets.getSchema().getFields().size());
+  }
+
+  @Test
+  public void testAppendKafkaOffsetsSourceFormatAdapter() throws IOException {
+    UtilitiesTestBase.Helpers.saveStringsToDFS(new String[] {dataGen.generateGenericRecord().getSchema().toString()}, fs(), SCHEMA_PATH);
+    final String topic = TEST_TOPIC_PREFIX + "testKafkaOffsetAppend";
+    TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+
+    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", SCHEMA_PATH);
+    SchemaProvider schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
+        UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>());
+
+    props.put("hoodie.deltastreamer.source.kafka.value.deserializer.class", ByteArrayDeserializer.class.getName());
+    int numPartitions = 3;
+    int numMessages = 15;
+    testUtils.createTopic(topic,numPartitions);
+    sendMessagesToKafka(topic, numMessages, numPartitions);
+    AvroKafkaSource avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
+    SourceFormatAdapter kafkaSource = new SourceFormatAdapter(avroKafkaSource);
+    Dataset<Row> c = kafkaSource.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE)
+        .getBatch().get();
+    List<String> columns = Arrays.stream(c.columns()).collect(Collectors.toList());
+    props.put(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), "true");
+
+    schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
+        UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>());
+    avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
+    kafkaSource = new SourceFormatAdapter(avroKafkaSource);
+    Dataset<Row>  d = kafkaSource.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE).getBatch().get();
+    assertEquals(numMessages, d.count());
+    for (int i = 0; i < numPartitions; i++) {
+      assertEquals(numMessages / numPartitions, d.filter("_hoodie_kafka_source_partition=" + i).collectAsList().size());
+    }
+    List<String> withKafkaOffsetColumns = Arrays.stream(d.columns()).collect(Collectors.toList());
+    assertEquals(0, d.drop(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN,"city_to_state").except(c.drop("city_to_state")).count());
+    assertEquals(3, withKafkaOffsetColumns.size() - columns.size());
+    List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN);
+    assertEquals(appendList, withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 3, withKafkaOffsetColumns.size()));
+  }
+}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index 9e6e5aad661..6e2f4d6e73b 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -29,6 +30,7 @@ import org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter;
 import org.apache.hudi.utilities.deltastreamer.ErrorEvent;
 import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -44,17 +46,22 @@ import org.junit.jupiter.api.Test;
 import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import scala.Tuple2;
 
 import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_BASE_PATH;
 import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TARGET_TABLE;
 import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
 import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -63,8 +70,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
  */
 public class TestJsonKafkaSource extends BaseTestKafkaSource {
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-  private static final HoodieTestDataGenerator DATA_GENERATOR = new HoodieTestDataGenerator(1L);
-  static final URL SCHEMA_FILE_URL = TestJsonKafkaSource.class.getClassLoader().getResource("delta-streamer-config/source.avsc");
+  static final URL SCHEMA_FILE_URL = TestJsonKafkaSource.class.getClassLoader().getResource("delta-streamer-config/source_short_trip_uber.avsc");
 
   @BeforeEach
   public void init() throws Exception {
@@ -112,7 +118,7 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource {
     // 1. Extract without any checkpoint => get all the data, respecting sourceLimit
     assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
     // Send  1000 non-null messages to Kafka
-    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 1000, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
     // Send  100 null messages to Kafka
     testUtils.sendMessages(topic, new String[100]);
     InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
@@ -135,12 +141,13 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource {
     1. Extract without any checkpoint => get all the data, respecting default upper cap since both sourceLimit and
     maxEventsFromKafkaSourceProp are set to Long.MAX_VALUE
      */
-    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+    List<HoodieRecord> send1 = dataGenerator.generateInsertsAsPerSchema("000", 1000, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA);
+    testUtils.sendMessages(topic, jsonifyRecords(send1));
     InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
     assertEquals(1000, fetch1.getBatch().get().count());
 
     // 2. Produce new data, extract new data based on sourceLimit
-    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
+    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("001", 1000, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
     InputBatch<Dataset<Row>> fetch2 =
         kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 1500);
     assertEquals(1000, fetch2.getBatch().get().count());
@@ -158,12 +165,12 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource {
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
 
     // 1. Extract without any checkpoint => get all the data, respecting sourceLimit
-    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 1000, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
     InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
     assertEquals(900, fetch1.getBatch().get().count());
 
     // 2. Produce new data, extract new data based on upper cap
-    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
+    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("001", 1000, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
     InputBatch<Dataset<Row>> fetch2 =
         kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
     assertEquals(500, fetch2.getBatch().get().count());
@@ -193,13 +200,14 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource {
   @Override
   void sendMessagesToKafka(String topic, int count, int numPartitions) {
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
-    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", count)));
+    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", count, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
   }
 
   void sendJsonSafeMessagesToKafka(String topic, int count, int numPartitions) {
     try {
       Tuple2<String, String>[] keyValues = new Tuple2[count];
-      String[] records = jsonifyRecords(DATA_GENERATOR.generateInserts("000", count));
+      HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+      String[] records = jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", count, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA));
       for (int i = 0; i < count; i++) {
         // Drop fields that don't translate to json properly
         Map node = OBJECT_MAPPER.readValue(records[i], Map.class);
@@ -256,7 +264,8 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource {
     TopicPartition topicPartition1 = new TopicPartition(topic, 1);
     topicPartitions.add(topicPartition1);
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
-    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 1000,
+        HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
     testUtils.sendMessages(topic, new String[]{"error_event1", "error_event2"});
 
     TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
@@ -295,4 +304,34 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource {
       }
     };
   }
+  
+  @Test
+  public void testAppendKafkaOffset() {
+    final String topic = TEST_TOPIC_PREFIX + "testKafkaOffsetAppend";
+    int numPartitions = 3;
+    int numMessages = 15;
+    testUtils.createTopic(topic, numPartitions);
+    sendMessagesToKafka(topic, numMessages, numPartitions);
+
+    TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+    Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
+    SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
+    Dataset<Row> c = kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get();
+    assertEquals(numMessages, c.count());
+    List<String> columns = Arrays.stream(c.columns()).collect(Collectors.toList());
+
+    props.put(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), "true");
+    jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
+    kafkaSource = new SourceFormatAdapter(jsonSource);
+    Dataset<Row> d = kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get();
+    assertEquals(numMessages, d.count());
+    for (int i = 0; i < numPartitions; i++) {
+      assertEquals(numMessages / numPartitions, d.filter("_hoodie_kafka_source_partition=" + i).collectAsList().size());
+    }
+    assertEquals(0, d.drop(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN).except(c).count());
+    List<String> withKafkaOffsetColumns = Arrays.stream(d.columns()).collect(Collectors.toList());
+    assertEquals(3, withKafkaOffsetColumns.size() - columns.size());
+    List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN);
+    assertEquals(appendList, withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 3, withKafkaOffsetColumns.size()));
+  }
 }
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java
index 2c1ab1f1c19..d02faabc3e8 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java
@@ -95,7 +95,7 @@ public class TestJsonKafkaSourcePostProcessor extends SparkClientFunctionalTestH
     Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
 
-    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 1000, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
     InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
 
     assertEquals(900, fetch1.getBatch().get().count());
@@ -116,7 +116,7 @@ public class TestJsonKafkaSourcePostProcessor extends SparkClientFunctionalTestH
     Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
 
-    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 1000, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
     InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
 
     assertNotEquals(900, fetch1.getBatch().get().count());
@@ -136,7 +136,7 @@ public class TestJsonKafkaSourcePostProcessor extends SparkClientFunctionalTestH
 
     Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
-    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 1000, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
 
     Assertions.assertThrows(HoodieSourcePostProcessException.class,
         () -> kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900));
@@ -158,7 +158,7 @@ public class TestJsonKafkaSourcePostProcessor extends SparkClientFunctionalTestH
     Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
 
-    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+    testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 1000, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
     InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
 
     assertEquals(0, fetch1.getBatch().get().count());