You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "ad1happy2go (via GitHub)" <gi...@apache.org> on 2023/03/26 09:41:04 UTC

[GitHub] [hudi] ad1happy2go commented on issue #8258: [SUPPORT] Deltastreamer errors ingesting kafka-streams topics (transactional / exactly_once producers)

ad1happy2go commented on issue #8258:
URL: https://github.com/apache/hudi/issues/8258#issuecomment-1484045330

   I am able to reproduce your issue with HoodieDeltaStreamer. We are looking into this issue. 
   
   meanwhile you can use spark structured streaming to read the data from transactional topic and write to Hudi. I am not seeing any error while using the structured streaming. Below is the code you can refer.
   
   `import io.confluent.kafka.schemaregistry.client.rest.RestService
   import io.confluent.kafka.serializers.KafkaAvroDeserializer
   import org.apache.avro.Schema
   
   
   val schemaRegistryURL = "http://localhost:8081"
   
   val topicName = "issue_8258_avro"
   val subjectValueName = topicName + "-value"
   
   //create RestService object
   val restService = new RestService(schemaRegistryURL)
   
   val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
   
   //Use Avro parsing classes to get Avro Schema
   val parser = new Schema.Parser
   val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)
   
   //key schema is typically just string but you can do the same process for the key as the value
   val keySchemaString = "\"string\""
   val keySchema = parser.parse(keySchemaString)
   
   //Create a map with the Schema registry url.
   //This is the only Required configuration for Confluent's KafkaAvroDeserializer.
   val props = Map("schema.registry.url" -> schemaRegistryURL)
   
   //Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
   var keyDeserializer: KafkaAvroDeserializer = null
   var valueDeserializer: KafkaAvroDeserializer = null
   
   //Create structured streaming DF to read from the topic.
   val rawTopicMessageDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", topicName).option("startingOffsets", "earliest").option("maxOffsetsPerTrigger", 20).load()
   
   
   import org.apache.hudi.QuickstartUtils._
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   import org.apache.hudi.common.model.HoodieRecord
   
   
   import org.apache.spark.sql.streaming.Trigger
   
     rawTopicMessageDF.writeStream.format("hudi").
       options(getQuickstartWriteConfigs).
       option(PRECOMBINE_FIELD_OPT_KEY, "timestamp").
       option(RECORDKEY_FIELD_OPT_KEY, "offset").
       option(TABLE_NAME, topicName).
       outputMode("append").
       option("path", "file:///tmp/" + topicName).
       option("checkpointLocation", "file:///tmp/ck_" + topicName).
       trigger(Trigger.Once()).
       start()
   
       spark.read.format("hudi").load("file:///tmp/" + topicName).count()
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org