You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2023/03/01 02:32:18 UTC
[hudi] branch master updated: [HUDI-5808] Add Support for kaffka ofsets in jsonkafkasource and avrokafkasource (#8011)
This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a72ca04eeea [HUDI-5808] Add Support for kaffka ofsets in jsonkafkasource and avrokafkasource (#8011)
a72ca04eeea is described below
commit a72ca04eeea213cf09d649763f097327ee67827f
Author: Jon Vexler <jb...@gmail.com>
AuthorDate: Tue Feb 28 21:32:09 2023 -0500
[HUDI-5808] Add Support for kaffka ofsets in jsonkafkasource and avrokafkasource (#8011)
Add Support for kaffka ofsets in jsonkafkasource and avrokafkasource.
- New config hoodie.deltastreamer.source.kafka.append.offsets. Default is "false".
- Set to "true" with JsonKafkaSource or AvroKafkaSource as your source to use.
---------
Co-authored-by: Jonathan Vexler <=>
Co-authored-by: sivabalan <n....@gmail.com>
---
.../org/apache/hudi/utilities/UtilHelpers.java | 16 +-
.../utilities/schema/KafkaOffsetPostProcessor.java | 73 ++++++++
.../hudi/utilities/sources/AvroKafkaSource.java | 37 +++-
.../hudi/utilities/sources/JsonKafkaSource.java | 43 ++++-
.../apache/hudi/utilities/sources/KafkaSource.java | 5 +-
.../hudi/utilities/sources/ProtoKafkaSource.java | 3 +
.../utilities/sources/helpers/AvroConvertor.java | 23 +++
.../deltastreamer/TestHoodieDeltaStreamer.java | 57 ++++++-
.../utilities/sources/TestAvroKafkaSource.java | 188 +++++++++++++++++++++
.../utilities/sources/TestJsonKafkaSource.java | 59 +++++--
.../sources/TestJsonKafkaSourcePostProcessor.java | 8 +-
11 files changed, 478 insertions(+), 34 deletions(-)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index e720ea7817d..bcd889df787 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -54,6 +54,7 @@ import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
+import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config;
@@ -484,7 +485,7 @@ public class UtilHelpers {
return originalProvider;
}
- public static SchemaProviderWithPostProcessor wrapSchemaProviderWithPostProcessor(SchemaProvider provider,
+ public static SchemaProvider wrapSchemaProviderWithPostProcessor(SchemaProvider provider,
TypedProperties cfg, JavaSparkContext jssc, List<String> transformerClassNames) {
if (provider == null) {
@@ -492,7 +493,7 @@ public class UtilHelpers {
}
if (provider instanceof SchemaProviderWithPostProcessor) {
- return (SchemaProviderWithPostProcessor) provider;
+ return provider;
}
String schemaPostProcessorClass = cfg.getString(Config.SCHEMA_POST_PROCESSOR_PROP, null);
@@ -503,10 +504,21 @@ public class UtilHelpers {
schemaPostProcessorClass = SparkAvroPostProcessor.class.getName();
}
+ if (schemaPostProcessorClass == null || schemaPostProcessorClass.isEmpty()) {
+ return provider;
+ }
+
return new SchemaProviderWithPostProcessor(provider,
Option.ofNullable(createSchemaPostProcessor(schemaPostProcessorClass, cfg, jssc)));
}
+ public static SchemaProvider getSchemaProviderForKafkaSource(SchemaProvider provider, TypedProperties cfg, JavaSparkContext jssc) {
+ if (KafkaOffsetPostProcessor.Config.shouldAddOffsets(cfg)) {
+ return new SchemaProviderWithPostProcessor(provider, Option.ofNullable(new KafkaOffsetPostProcessor(cfg, jssc)));
+ }
+ return provider;
+ }
+
public static SchemaProvider createRowBasedSchemaProvider(StructType structType, TypedProperties cfg, JavaSparkContext jssc) {
SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc, null);
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
new file mode 100644
index 00000000000..cc0ce2cf35a
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.apache.avro.Schema;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Used internally to add Kafka offsets. You should probably not set
+ * hoodie.deltastreamer.schemaprovider.schema_post_processor to this class
+ * */
+public class KafkaOffsetPostProcessor extends SchemaPostProcessor {
+
+ public static class Config {
+ public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+ .key("hoodie.deltastreamer.source.kafka.append.offsets")
+ .defaultValue("false")
+ .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+ + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+ + "By default its disabled and no kafka offsets are added");
+
+ public static boolean shouldAddOffsets(TypedProperties props) {
+ return props.getBoolean(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), Boolean.parseBoolean(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.defaultValue()));
+ }
+ }
+
+ private static final Logger LOG = LogManager.getLogger(KafkaOffsetPostProcessor.class);
+
+ public static final String KAFKA_SOURCE_OFFSET_COLUMN = "_hoodie_kafka_source_offset";
+ public static final String KAFKA_SOURCE_PARTITION_COLUMN = "_hoodie_kafka_source_partition";
+ public static final String KAFKA_SOURCE_TIMESTAMP_COLUMN = "_hoodie_kafka_source_timestamp";
+
+ public KafkaOffsetPostProcessor(TypedProperties props, JavaSparkContext jssc) {
+ super(props, jssc);
+ }
+
+ @Override
+ public Schema processSchema(Schema schema) {
+ // this method adds kafka offset fields namely source offset, partition and timestamp to the schema of the batch.
+ List<Schema.Field> fieldList = schema.getFields();
+ List<Schema.Field> newFieldList = fieldList.stream()
+ .map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal())).collect(Collectors.toList());
+ newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN, Schema.create(Schema.Type.LONG), "offset column", 0));
+ newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN, Schema.create(Schema.Type.INT), "partition column", 0));
+ newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN, Schema.create(Schema.Type.LONG), "timestamp column", 0));
+ Schema newSchema = Schema.createRecord(schema.getName() + "_processed", schema.getDoc(), schema.getNamespace(), false, newFieldList);
+ return newSchema;
+ }
+}
\ No newline at end of file
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index a20ecbdfbf0..3f7b8752b34 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -22,6 +22,7 @@ import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -29,6 +30,7 @@ import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.log4j.LogManager;
@@ -51,13 +53,19 @@ public class AvroKafkaSource extends KafkaSource<GenericRecord> {
public static final String KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA = KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX + "schema";
private final String deserializerClassName;
+ //other schema provider may have kafka offsets
+ protected final SchemaProvider originalSchemaProvider;
+
public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
- SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
- super(props, sparkContext, sparkSession, schemaProvider, SourceType.AVRO, metrics);
+ SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
+ super(props, sparkContext, sparkSession,
+ UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider, props, sparkContext),
+ SourceType.AVRO, metrics);
+ this.originalSchemaProvider = schemaProvider;
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class.getName());
deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
- DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
+ DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
try {
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName).getName());
@@ -77,16 +85,31 @@ public class AvroKafkaSource extends KafkaSource<GenericRecord> {
@Override
JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
+ JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD;
if (deserializerClassName.equals(ByteArrayDeserializer.class.getName())) {
if (schemaProvider == null) {
throw new HoodieException("Please provide a valid schema provider class when use ByteArrayDeserializer!");
}
+
+ //Don't want kafka offsets here so we use originalSchemaProvider
+ AvroConvertor convertor = new AvroConvertor(originalSchemaProvider.getSourceSchema());
+ kafkaRDD = KafkaUtils.<String, byte[]>createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
+ LocationStrategies.PreferConsistent()).map(obj ->
+ new ConsumerRecord<>(obj.topic(), obj.partition(), obj.offset(),
+ obj.key(), convertor.fromAvroBinary(obj.value())));
+ } else {
+ kafkaRDD = KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
+ LocationStrategies.PreferConsistent());
+ }
+ return maybeAppendKafkaOffsets(kafkaRDD);
+ }
+
+ protected JavaRDD<GenericRecord> maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD) {
+ if (this.shouldAddOffsets) {
AvroConvertor convertor = new AvroConvertor(schemaProvider.getSourceSchema());
- return KafkaUtils.<String, byte[]>createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
- LocationStrategies.PreferConsistent()).map(obj -> convertor.fromAvroBinary(obj.value()));
+ return kafkaRDD.map(convertor::withKafkaFieldsAppended);
} else {
- return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
- LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value());
+ return kafkaRDD.map(consumerRecord -> (GenericRecord) consumerRecord.value());
}
}
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index 26849d499e9..9e6d4ce4fb2 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -27,6 +27,9 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -36,6 +39,12 @@ import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
/**
* Read json kafka data.
@@ -44,7 +53,9 @@ public class JsonKafkaSource extends KafkaSource<String> {
public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
- super(properties, sparkContext, sparkSession, schemaProvider, SourceType.JSON, metrics);
+ super(properties, sparkContext, sparkSession,
+ UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider, properties, sparkContext),
+ SourceType.JSON, metrics);
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());
this.offsetGen = new KafkaOffsetGen(props);
@@ -52,13 +63,35 @@ public class JsonKafkaSource extends KafkaSource<String> {
@Override
JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
- JavaRDD<String> jsonStringRDD = KafkaUtils.createRDD(sparkContext,
+ JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD = KafkaUtils.createRDD(sparkContext,
offsetGen.getKafkaParams(),
offsetRanges,
LocationStrategies.PreferConsistent())
- .filter(x -> !StringUtils.isNullOrEmpty((String) x.value()))
- .map(x -> x.value().toString());
- return postProcess(jsonStringRDD);
+ .filter(x -> !StringUtils.isNullOrEmpty((String) x.value()));
+ return postProcess(maybeAppendKafkaOffsets(kafkaRDD));
+ }
+
+ protected JavaRDD<String> maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD) {
+ if (this.shouldAddOffsets) {
+ return kafkaRDD.mapPartitions(partitionIterator -> {
+ List<String> stringList = new LinkedList<>();
+ ObjectMapper om = new ObjectMapper();
+ partitionIterator.forEachRemaining(consumerRecord -> {
+ String record = consumerRecord.value().toString();
+ try {
+ ObjectNode jsonNode = (ObjectNode) om.readTree(record);
+ jsonNode.put(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
+ jsonNode.put(KAFKA_SOURCE_PARTITION_COLUMN, consumerRecord.partition());
+ jsonNode.put(KAFKA_SOURCE_TIMESTAMP_COLUMN, consumerRecord.timestamp());
+ stringList.add(om.writeValueAsString(jsonNode));
+ } catch (Throwable e) {
+ stringList.add(record);
+ }
+ });
+ return stringList.iterator();
+ });
+ }
+ return kafkaRDD.map(consumerRecord -> (String) consumerRecord.value());
}
private JavaRDD<String> postProcess(JavaRDD<String> jsonStringRDD) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
index 5561356cabc..0fb94a1919c 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
+import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
@@ -42,12 +43,14 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> {
protected final SchemaProvider schemaProvider;
protected KafkaOffsetGen offsetGen;
+ protected final boolean shouldAddOffsets;
+
protected KafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider, SourceType sourceType, HoodieDeltaStreamerMetrics metrics) {
super(props, sparkContext, sparkSession, schemaProvider, sourceType);
-
this.schemaProvider = schemaProvider;
this.metrics = metrics;
+ this.shouldAddOffsets = KafkaOffsetPostProcessor.Config.shouldAddOffsets(props);
}
@Override
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
index 1ed057e322d..7effb0aa543 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
@@ -58,6 +58,9 @@ public class ProtoKafkaSource extends KafkaSource<Message> {
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, ByteArrayDeserializer.class);
className = props.getString(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key());
this.offsetGen = new KafkaOffsetGen(props);
+ if (this.shouldAddOffsets) {
+ throw new HoodieException("Appending kafka offsets to ProtoKafkaSource is not supported");
+ }
}
@Override
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
index ee160e19d52..3906d0ce7cc 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
@@ -25,9 +25,15 @@ import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.io.Serializable;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;
@@ -115,4 +121,21 @@ public class AvroConvertor implements Serializable {
initSchema();
return ProtoConversionUtil.convertToAvro(schema, message);
}
+
+ /**
+ * this.schema is required to have kafka offsets for this to work
+ */
+ public GenericRecord withKafkaFieldsAppended(ConsumerRecord consumerRecord) {
+ initSchema();
+ GenericRecord record = (GenericRecord) consumerRecord.value();
+ GenericRecordBuilder recordBuilder = new GenericRecordBuilder(this.schema);
+ for (Schema.Field field : record.getSchema().getFields()) {
+ recordBuilder.set(field, record.get(field.name()));
+ }
+ recordBuilder.set(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
+ recordBuilder.set(KAFKA_SOURCE_PARTITION_COLUMN, consumerRecord.partition());
+ recordBuilder.set(KAFKA_SOURCE_TIMESTAMP_COLUMN, consumerRecord.timestamp());
+ return recordBuilder.build();
+ }
+
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 61d03bc0b58..29d8c9bbe78 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -67,6 +67,7 @@ import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.HoodieIndexer;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
import org.apache.hudi.utilities.sources.CsvDFSSource;
@@ -123,6 +124,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -148,6 +150,9 @@ import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -1667,9 +1672,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
}
private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) {
+ prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2);
+ }
+
+ private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName, int numPartitions) {
if (createTopic) {
try {
- testUtils.createTopic(topicName, 2);
+ testUtils.createTopic(topicName, numPartitions);
} catch (TopicExistsException e) {
// no op
}
@@ -1804,10 +1813,10 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
}
private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName) throws IOException {
- prepareJsonKafkaDFSSource(propsFileName, autoResetValue, topicName, null);
+ prepareJsonKafkaDFSSource(propsFileName, autoResetValue, topicName, null, false);
}
- private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName, Map<String,String> extraProps) throws IOException {
+ private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName, Map<String,String> extraProps, boolean shouldAddOffsets) throws IOException {
// Properties used for testing delta-streamer with JsonKafka source
TypedProperties props = new TypedProperties();
populateAllCommonProps(props, basePath, testUtils.brokerAddress());
@@ -1824,7 +1833,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
if (extraProps != null && !extraProps.isEmpty()) {
extraProps.forEach(props::setProperty);
}
-
+ props.setProperty(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), Boolean.toString(shouldAddOffsets));
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath + "/" + propsFileName);
}
@@ -1895,6 +1904,44 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext);
}
+ @Test
+ public void testJsonKafkaDFSSourceWithOffsets() throws Exception {
+ topicName = "topic" + testNum;
+ int numRecords = 15;
+ int numPartitions = 3;
+ int recsPerPartition = numRecords / numPartitions;
+ long beforeTime = Instant.now().toEpochMilli();
+ prepareJsonKafkaDFSFiles(numRecords, true, topicName, numPartitions);
+ prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName, null, true);
+ String tableBasePath = basePath + "/test_json_kafka_offsets_table" + testNum;
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
+ Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
+ true, 100000, false, null, null, "timestamp", null), jsc);
+ deltaStreamer.sync();
+ sqlContext.clearCache();
+ Dataset<Row> ds = sqlContext.read().format("org.apache.hudi").load(tableBasePath);
+ assertEquals(numRecords, ds.count());
+ //ensure that kafka partition column exists and is populated correctly
+ for (int i = 0; i < numPartitions; i++) {
+ assertEquals(recsPerPartition, ds.filter(KAFKA_SOURCE_PARTITION_COLUMN + "=" + i).count());
+ }
+
+ //ensure that kafka timestamp column exists and is populated correctly
+ long afterTime = Instant.now().toEpochMilli();
+ assertEquals(numRecords, ds.filter(KAFKA_SOURCE_TIMESTAMP_COLUMN + ">" + beforeTime).filter(KAFKA_SOURCE_TIMESTAMP_COLUMN + "<" + afterTime).count());
+
+
+ //ensure that kafka offset column exists and is populated correctly
+ sqlContext.read().format("org.apache.hudi").load(tableBasePath).col(KAFKA_SOURCE_OFFSET_COLUMN);
+ for (int i = 0; i < recsPerPartition; i++) {
+ for (int j = 0; j < numPartitions; j++) {
+ //each offset partition pair should be unique
+ assertEquals(1, ds.filter(KAFKA_SOURCE_OFFSET_COLUMN + "=" + i).filter(KAFKA_SOURCE_PARTITION_COLUMN + "=" + j).count());
+ }
+ }
+ }
+
@Test
public void testKafkaTimestampType() throws Exception {
topicName = "topic" + testNum;
@@ -1948,7 +1995,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
prepareJsonKafkaDFSFiles(20, true, topicName);
Map<String, String> kafkaExtraProps = new HashMap<>();
kafkaExtraProps.put(MUTLI_WRITER_SOURCE_CHECKPOINT_ID.key(), "kafka");
- prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName, kafkaExtraProps);
+ prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName, kafkaExtraProps, false);
// delta streamer w/ json kafka source
HoodieDeltaStreamer kafkaDs = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
new file mode 100644
index 00000000000..cb81ad605ed
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
+import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.streaming.kafka010.KafkaTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness {
+ protected static final String TEST_TOPIC_PREFIX = "hoodie_avro_test_";
+
+ protected static KafkaTestUtils testUtils;
+
+ protected static HoodieTestDataGenerator dataGen;
+
+ protected static String SCHEMA_PATH = "/tmp/schema_file.avsc";
+
+ protected final HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class);
+
+ protected SchemaProvider schemaProvider;
+
+ @BeforeAll
+ public static void initClass() {
+ testUtils = new KafkaTestUtils();
+ dataGen = new HoodieTestDataGenerator(0xDEED);
+ testUtils.setup();
+ }
+
+ @AfterAll
+ public static void cleanupClass() {
+ testUtils.teardown();
+ }
+
+ protected TypedProperties createPropsForKafkaSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) {
+ TypedProperties props = new TypedProperties();
+ props.setProperty("hoodie.deltastreamer.source.kafka.topic", topic);
+ props.setProperty("bootstrap.servers", testUtils.brokerAddress());
+ props.setProperty("auto.offset.reset", resetStrategy);
+ props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
+ maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
+ String.valueOf(KafkaOffsetGen.Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue()));
+ props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+ return props;
+ }
+
+ void sendMessagesToKafka(String topic, int count, int numPartitions) {
+ List<GenericRecord> genericRecords = dataGen.generateGenericRecords(count);
+ Properties config = getProducerProperties();
+ try (Producer<String, byte[]> producer = new KafkaProducer<>(config)) {
+ for (int i = 0; i < genericRecords.size(); i++) {
+ // use consistent keys to get even spread over partitions for test expectations
+ producer.send(new ProducerRecord<>(topic, i % numPartitions, "key", HoodieAvroUtils.avroToBytes(genericRecords.get(i))));
+ }
+ }
+ }
+
+ private Properties getProducerProperties() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", testUtils.brokerAddress());
+ props.put("value.serializer", ByteArraySerializer.class.getName());
+ props.put("value.deserializer", ByteArraySerializer.class.getName());
+ // Key serializer is required.
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("auto.register.schemas", "false");
+ // wait for all in-sync replicas to ack sends
+ props.put("acks", "all");
+ return props;
+ }
+
+ @Test
+ public void testAppendKafkaOffsets() throws IOException {
+ UtilitiesTestBase.Helpers.saveStringsToDFS(new String[] {dataGen.generateGenericRecord().getSchema().toString()}, fs(), SCHEMA_PATH);
+ ConsumerRecord<Object, Object> recordConsumerRecord = new ConsumerRecord<Object,Object>("test", 0, 1L,
+ "test", dataGen.generateGenericRecord());
+ JavaRDD<ConsumerRecord<Object, Object>> rdd = jsc().parallelize(Arrays.asList(recordConsumerRecord));
+ TypedProperties props = new TypedProperties();
+ props.put("hoodie.deltastreamer.source.kafka.topic", "test");
+ props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", SCHEMA_PATH);
+ SchemaProvider schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
+ UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>());
+
+ AvroKafkaSource avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, null);
+ GenericRecord withoutKafkaOffsets = avroKafkaSource.maybeAppendKafkaOffsets(rdd).collect().get(0);
+
+ props.put(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), "true");
+ schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
+ UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>());
+ avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, null);
+ GenericRecord withKafkaOffsets = avroKafkaSource.maybeAppendKafkaOffsets(rdd).collect().get(0);
+ assertEquals(3,withKafkaOffsets.getSchema().getFields().size() - withoutKafkaOffsets.getSchema().getFields().size());
+ }
+
+ @Test
+ public void testAppendKafkaOffsetsSourceFormatAdapter() throws IOException {
+ UtilitiesTestBase.Helpers.saveStringsToDFS(new String[] {dataGen.generateGenericRecord().getSchema().toString()}, fs(), SCHEMA_PATH);
+ final String topic = TEST_TOPIC_PREFIX + "testKafkaOffsetAppend";
+ TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+
+ props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", SCHEMA_PATH);
+ SchemaProvider schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
+ UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>());
+
+ props.put("hoodie.deltastreamer.source.kafka.value.deserializer.class", ByteArrayDeserializer.class.getName());
+ int numPartitions = 3;
+ int numMessages = 15;
+ testUtils.createTopic(topic,numPartitions);
+ sendMessagesToKafka(topic, numMessages, numPartitions);
+ AvroKafkaSource avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
+ SourceFormatAdapter kafkaSource = new SourceFormatAdapter(avroKafkaSource);
+ Dataset<Row> c = kafkaSource.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE)
+ .getBatch().get();
+ List<String> columns = Arrays.stream(c.columns()).collect(Collectors.toList());
+ props.put(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), "true");
+
+ schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
+ UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>());
+ avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
+ kafkaSource = new SourceFormatAdapter(avroKafkaSource);
+ Dataset<Row> d = kafkaSource.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE).getBatch().get();
+ assertEquals(numMessages, d.count());
+ for (int i = 0; i < numPartitions; i++) {
+ assertEquals(numMessages / numPartitions, d.filter("_hoodie_kafka_source_partition=" + i).collectAsList().size());
+ }
+ List<String> withKafkaOffsetColumns = Arrays.stream(d.columns()).collect(Collectors.toList());
+ assertEquals(0, d.drop(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN,"city_to_state").except(c.drop("city_to_state")).count());
+ assertEquals(3, withKafkaOffsetColumns.size() - columns.size());
+ List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN);
+ assertEquals(appendList, withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 3, withKafkaOffsetColumns.size()));
+ }
+}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index 9e6e5aad661..6e2f4d6e73b 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -29,6 +30,7 @@ import org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter;
import org.apache.hudi.utilities.deltastreamer.ErrorEvent;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -44,17 +46,22 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
+import java.util.stream.Collectors;
import scala.Tuple2;
import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_BASE_PATH;
import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TARGET_TABLE;
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
+import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -63,8 +70,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
*/
public class TestJsonKafkaSource extends BaseTestKafkaSource {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final HoodieTestDataGenerator DATA_GENERATOR = new HoodieTestDataGenerator(1L);
- static final URL SCHEMA_FILE_URL = TestJsonKafkaSource.class.getClassLoader().getResource("delta-streamer-config/source.avsc");
+ static final URL SCHEMA_FILE_URL = TestJsonKafkaSource.class.getClassLoader().getResource("delta-streamer-config/source_short_trip_uber.avsc");
@BeforeEach
public void init() throws Exception {
@@ -112,7 +118,7 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource {
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
// Send 1000 non-null messages to Kafka
- testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+ testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 1000, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
// Send 100 null messages to Kafka
testUtils.sendMessages(topic, new String[100]);
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
@@ -135,12 +141,13 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource {
1. Extract without any checkpoint => get all the data, respecting default upper cap since both sourceLimit and
maxEventsFromKafkaSourceProp are set to Long.MAX_VALUE
*/
- testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+ List<HoodieRecord> send1 = dataGenerator.generateInsertsAsPerSchema("000", 1000, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA);
+ testUtils.sendMessages(topic, jsonifyRecords(send1));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(1000, fetch1.getBatch().get().count());
// 2. Produce new data, extract new data based on sourceLimit
- testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
+ testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("001", 1000, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 1500);
assertEquals(1000, fetch2.getBatch().get().count());
@@ -158,12 +165,12 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource {
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
- testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+ testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 1000, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
assertEquals(900, fetch1.getBatch().get().count());
// 2. Produce new data, extract new data based on upper cap
- testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
+ testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("001", 1000, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(500, fetch2.getBatch().get().count());
@@ -193,13 +200,14 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource {
@Override
void sendMessagesToKafka(String topic, int count, int numPartitions) {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
- testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", count)));
+ testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", count, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
}
void sendJsonSafeMessagesToKafka(String topic, int count, int numPartitions) {
try {
Tuple2<String, String>[] keyValues = new Tuple2[count];
- String[] records = jsonifyRecords(DATA_GENERATOR.generateInserts("000", count));
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ String[] records = jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", count, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA));
for (int i = 0; i < count; i++) {
// Drop fields that don't translate to json properly
Map node = OBJECT_MAPPER.readValue(records[i], Map.class);
@@ -256,7 +264,8 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource {
TopicPartition topicPartition1 = new TopicPartition(topic, 1);
topicPartitions.add(topicPartition1);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
- testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+ testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 1000,
+ HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
testUtils.sendMessages(topic, new String[]{"error_event1", "error_event2"});
TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
@@ -295,4 +304,34 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource {
}
};
}
+
+ @Test
+ public void testAppendKafkaOffset() {
+ final String topic = TEST_TOPIC_PREFIX + "testKafkaOffsetAppend";
+ int numPartitions = 3;
+ int numMessages = 15;
+ testUtils.createTopic(topic, numPartitions);
+ sendMessagesToKafka(topic, numMessages, numPartitions);
+
+ TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+ Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
+ SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
+ Dataset<Row> c = kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get();
+ assertEquals(numMessages, c.count());
+ List<String> columns = Arrays.stream(c.columns()).collect(Collectors.toList());
+
+ props.put(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), "true");
+ jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
+ kafkaSource = new SourceFormatAdapter(jsonSource);
+ Dataset<Row> d = kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get();
+ assertEquals(numMessages, d.count());
+ for (int i = 0; i < numPartitions; i++) {
+ assertEquals(numMessages / numPartitions, d.filter("_hoodie_kafka_source_partition=" + i).collectAsList().size());
+ }
+ assertEquals(0, d.drop(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN).except(c).count());
+ List<String> withKafkaOffsetColumns = Arrays.stream(d.columns()).collect(Collectors.toList());
+ assertEquals(3, withKafkaOffsetColumns.size() - columns.size());
+ List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN);
+ assertEquals(appendList, withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 3, withKafkaOffsetColumns.size()));
+ }
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java
index 2c1ab1f1c19..d02faabc3e8 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java
@@ -95,7 +95,7 @@ public class TestJsonKafkaSourcePostProcessor extends SparkClientFunctionalTestH
Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
- testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+ testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 1000, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
assertEquals(900, fetch1.getBatch().get().count());
@@ -116,7 +116,7 @@ public class TestJsonKafkaSourcePostProcessor extends SparkClientFunctionalTestH
Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
- testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+ testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 1000, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
assertNotEquals(900, fetch1.getBatch().get().count());
@@ -136,7 +136,7 @@ public class TestJsonKafkaSourcePostProcessor extends SparkClientFunctionalTestH
Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
- testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+ testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 1000, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
Assertions.assertThrows(HoodieSourcePostProcessException.class,
() -> kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900));
@@ -158,7 +158,7 @@ public class TestJsonKafkaSourcePostProcessor extends SparkClientFunctionalTestH
Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
- testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+ testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 1000, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
assertEquals(0, fetch1.getBatch().get().count());