You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "ad1happy2go (via GitHub)" <gi...@apache.org> on 2023/03/26 09:41:04 UTC
[GitHub] [hudi] ad1happy2go commented on issue #8258: [SUPPORT] Deltastreamer errors ingesting kafka-streams topics (transactional / exactly_once producers)
ad1happy2go commented on issue #8258:
URL: https://github.com/apache/hudi/issues/8258#issuecomment-1484045330
I am able to reproduce your issue with HoodieDeltaStreamer. We are looking into this issue.
meanwhile you can use spark structured streaming to read the data from transactional topic and write to Hudi. I am not seeing any error while using the structured streaming. Below is the code you can refer.
`import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
val schemaRegistryURL = "http://localhost:8081"
val topicName = "issue_8258_avro"
val subjectValueName = topicName + "-value"
//create RestService object
val restService = new RestService(schemaRegistryURL)
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
//Use Avro parsing classes to get Avro Schema
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)
//key schema is typically just string but you can do the same process for the key as the value
val keySchemaString = "\"string\""
val keySchema = parser.parse(keySchemaString)
//Create a map with the Schema registry url.
//This is the only Required configuration for Confluent's KafkaAvroDeserializer.
val props = Map("schema.registry.url" -> schemaRegistryURL)
//Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
var keyDeserializer: KafkaAvroDeserializer = null
var valueDeserializer: KafkaAvroDeserializer = null
//Create structured streaming DF to read from the topic.
val rawTopicMessageDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", topicName).option("startingOffsets", "earliest").option("maxOffsetsPerTrigger", 20).load()
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.spark.sql.streaming.Trigger
rawTopicMessageDF.writeStream.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "timestamp").
option(RECORDKEY_FIELD_OPT_KEY, "offset").
option(TABLE_NAME, topicName).
outputMode("append").
option("path", "file:///tmp/" + topicName).
option("checkpointLocation", "file:///tmp/ck_" + topicName).
trigger(Trigger.Once()).
start()
spark.read.format("hudi").load("file:///tmp/" + topicName).count()
`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org