You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "danielfordfc (via GitHub)" <gi...@apache.org> on 2023/03/21 13:38:33 UTC

[GitHub] [hudi] danielfordfc opened a new issue, #8258: [SUPPORT] Deltastreamer errors ingesting kafka-streams topics (transactional / exactly_once producers)

danielfordfc opened a new issue, #8258:
URL: https://github.com/apache/hudi/issues/8258

   **Describe the problem you faced**
   
   Deltastreamer, when being run on a **transactional topic** (one being produced to by a [transactional producer, like kafka-streams](https://medium.com/lydtech-consulting/kafka-streams-transactions-exactly-once-messaging-82194b50900a)) is unable to be read.
   
   ```bash
   Caused by: java.lang.IllegalArgumentException: requirement failed:
    Failed to get records for compacted spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 after polling for 1000
    
    OR when using AllowNonConsecutiveOffsets=false..
    
   Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4) (192.168.1.240 executor driver): java.lang.IllegalArgumentException: requirement failed: Failed to get records for spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 5 after polling for 1000
           at scala.Predef$.require(Predef.scala:281)
           at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:143)
   ```
   
   Full stack traces will be linked below.
   
   Our configuration is as follows:
   > Note: this works fine on non-transactional topics, and compacted topics (cleanup.policy=compact) when setting `AllowNonConsecutiveOffsets=true`
   > Note: This isn't a networking issue or timeout issue which we'll discuss below.
   
   ``` bash 
   hoodie.datasource.write.recordkey.field=viewtime
   hoodie.datasource.write.partitionpath.field=pageid
   hoodie.deltastreamer.source.kafka.topic=${topic}
   hoodie.deltastreamer.schemaprovider.registry.url=[http://localhost:8081/subjects/${topic}-value/versions/latest](http://localhost:8081/subjects/$%7Btopic%7D-value/versions/latest)
   schema.registry.url=http://localhost:8081/
   # Kafka Consumer props
   bootstrap.servers=localhost:9092
   auto.offset.reset=earliest
   # Consumer Group
   group.id=hudi-deltastreamer-${topic}
   #isolation.level=read_committed <-- tried adjusting this with no effect
   #enable.auto.commit=false <-- tried adjusting this with no effect
   
   # spark.properties
   spark.streaming.kafka.allowNonConsecutiveOffsets=true <--  so we use this by default as some of our topics are compacted
   spark.streaming.kafka.consumer.poll.ms=1000 <-- To make it fail faster, from default of 120,000
   spark.executor.cores=1
   
   spark-submit \
     --master local[1] \
     --num-executors=1 \
     --executor-cores=1 \
     --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
     --conf "spark.driver.extraJavaOptions=${LOG4J_SETTING}" \
     --conf "spark.executor.extraJavaOptions=${LOG4J_SETTING}" \
     --properties-file ~/Sandbox/spark-sandbox/src/main/resources/spark.properties \
    ~/Workspace/github.com/apache/hudi/target/hudi-utilities-bundle_2.12-0.12.1.jar \
    --op INSERT \
    --props /tmp/hoodie-conf-${topic}.properties \
    --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
    --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
    --source-ordering-field viewtime  \
    --table-type COPY_ON_WRITE \
    --target-base-path file://${target_base_path} \
    --target-table $target_table
   ```
   
   We've identified that this is because the `get()` / `compactedNext()` method used by the `InternalKafkaDataConsumer` is failing to poll records/batches for these transactional topics..
   
   If we create a simple non-compacted topic that we'll be writing to non-transactionally, and one transactionally:
   
   ``` shell 
   kafka-topics --bootstrap-server localhost:9092 --create --topic my-transactional-topic --partitions 5 
   kafka-topics --bootstrap-server localhost:9092 --create --pageviews --partitions 5 
   ```
   
   ``` shell 
   # Following messages can be consumed when produced **without** transactional producer, but not with..
   [{:viewtime 100 :userid "User_0" :pageid "Page_0"}
                  {:viewtime 101 :userid "User_1" :pageid "Page_1"}
                  {:viewtime 102 :userid "User_2" :pageid "Page_2"}
                  ...etc...
                  {:viewtime 115 :userid "User_15" :pageid "Page_15"}]
   ```
   
   **Produced 16 message non-transactionally** -- as you can see the end/"next available" offset is the one after the last offset containing data in each partition
   ![Screenshot 2023-03-21 at 12 36 09](https://user-images.githubusercontent.com/75728527/226607911-b8a2d0bf-9bf8-470f-b910-626dc2c2a51e.png)
   
   **Produced 16 message transactionally** -- as you can see the end/"next available" offset is 2 more than the last offset containing data in each partition, because the end of that batch of write placed a commit marker/offset message in each partition
   ![Screenshot 2023-03-21 at 12 36 37](https://user-images.githubusercontent.com/75728527/226607926-3e3c7e93-6f70-414f-96e6-b03a47b3128d.png)
   
   And we see the stack traces mentioned at the bottom:  
   [hoodie-allow-consecutive-off-false.log](https://github.com/apache/hudi/files/11028874/hoodie-allow-consecutive-off-false.log)
   [hoodie-allow-consecutive-off-true.log](https://github.com/apache/hudi/files/11028875/hoodie-allow-consecutive-off-true.log)
   
   
   Notably
   
   **Extra Information gathered from running this locally**
   
   Dive into our local example showing how we get the poll of [topic-partition] 5, followed by a poll of [] 0, followed by the crash when AllowNonConsecutiveOffsets=true
   
   
   Interestingly, in the below, when setting AllowNonConsecutiveOffsets=False, we see that the initial poll for the partition 0 (which from the above screenshot, showed offset 0->4 being valid messages, offset 5 being the commit marker, has it poll those first 5 messages, then fail on the next poll.
   ```
   23/03/21 12:48:57 INFO org.apache.spark.streaming.kafka010.KafkaRDD: Computing topic my-transactional-topic, partition 0 offsets 0 -> 6
   23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.KafkaDataConsumer: Not used cached consumer found, re-using it InternalKafkaConsumer(hash=511066e5, groupId=spark-executor-hudi-deltastreamer-my-transactional-topic, topicPartition=my-transactional-topic-0)
   23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 1 requested 0
   23/03/21 12:48:57 INFO org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Initial fetch for spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 0
   23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Seeking to my-transactional-topic-0 0
   23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled [my-transactional-topic-0]  5
   23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 1 requested 1
   23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 2 requested 2
   23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 3 requested 3
   23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 4 requested 4
   23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 5 requested 5
   23/03/21 12:48:58 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled []  0
   23/03/21 12:48:59 WARN org.apache.spark.storage.BlockManager: Putting block rdd_2_0 failed due to exception java.lang.IllegalArgumentException: requirement failed: Failed to get records for spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 5 after polling for 1000.
   ```
   
   If we create another topic with one partition and write a single batch of 16 records transactionally (so 0->15 is data, 16 is commit marker, end of topic is 17), we see similar behaviour.
   
   [my-transactional-topic-single-partition.log](https://github.com/apache/hudi/files/11028871/my-transactional-topic-single-partition.log)
   
   
   If we remove the possibility that it might be crashing because the endOffset is the "invisible" marker that it can't read, by adding another 16 records (putting 17->32 as data, 33 as the marker and 34 as endOffset), we see a similar issue with the following:
   
   ```
   requirement failed: Got wrong record for spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 even after seeking to offset 16 got offset 17 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets
   ```
   [my-transactional-topic-single-partition-32-msgs.log](https://github.com/apache/hudi/files/11028880/my-transactional-topic-single-partition-32-msgs.log)
   
   
   Changing to `AllowNonConsecutiveOffsets=true` on the above topic yields the following:
   ```
   23/03/21 13:24:10 INFO org.apache.spark.streaming.kafka010.KafkaRDD: Computing topic my-transactional-topic-single-partition, partition 0 offsets 0 -> 34
   23/03/21 13:24:10 DEBUG org.apache.spark.streaming.kafka010.KafkaDataConsumer: Not used cached consumer found, re-using it InternalKafkaConsumer(hash=9903e40, groupId=spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition, topicPartition=my-transactional-topic-single-partition-0)
   23/03/21 13:24:10 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: compacted start spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 starting 0
   23/03/21 13:24:10 INFO org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Initial fetch for compacted spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 0
   23/03/21 13:24:10 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Seeking to my-transactional-topic-single-partition-0 0
   23/03/21 13:24:10 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled [my-transactional-topic-single-partition-0]  32
   23/03/21 13:24:10 INFO org.apache.spark.storage.BlockManager: Removing RDD 6
   23/03/21 13:24:11 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled []  0
   23/03/21 13:24:11 WARN org.apache.spark.storage.BlockManager: Putting block rdd_2_0 failed due to exception java.lang.IllegalArgumentException: requirement failed: Failed to get records for compacted spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 after polling for 1000.
   23/03/21 13:24:11 WARN org.apache.spark.storage.BlockManager: Block rdd_2_0 could not be removed as it was not found on disk or in memory
   23/03/21 13:24:11 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 4.0 (TID 4)
   java.lang.IllegalArgumentException: requirement failed: Failed to get records for compacted spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 after polling for 1000
   ```
   Stack trace for the above:
   [my-transactional-topic-single-partition-allownonconsecutiveoffsetsTrue.log](https://github.com/apache/hudi/files/11028882/my-transactional-topic-single-partition-allownonconsecutiveoffsetsTrue.log)
   
   **Answers Required**
   
   So we know what the problem is, we are just unsure on how to fix.
   We've taken this to the hudi office hours before and the host suggested to ask @yihua for advice.
   
   
   **Usual Environment in Production, but all of this has been reproduced locally**
   
   Hudi version : Deltastreamer on EMR 6.8.0 running Hudi 0.11.1-amzn-0
   Spark version : 3.3.0
   Hive version : 3.1.3
   Hadoop version : Amazon 3.2.1
   Storage (HDFS/S3/GCS..) : S3
   Running on Docker? (yes/no) : No
   
   **Additional context**
   
   hudi 0.12.1 used for local testing
   Can add more details if required.
   
   **Stacktrace**
   Stacktraces have been littered throughout but pasted here again:
   
   [my-transactional-topic-single-partition-32-msgs.log](https://github.com/apache/hudi/files/11028884/my-transactional-topic-single-partition-32-msgs.log)
   [my-transactional-topic-single-partition-allownonconsecutiveoffsetsTrue.log](https://github.com/apache/hudi/files/11028885/my-transactional-topic-single-partition-allownonconsecutiveoffsetsTrue.log)
   [hoodie-allow-consecutive-off-false.log](https://github.com/apache/hudi/files/11028886/hoodie-allow-consecutive-off-false.log)
   [hoodie-allow-consecutive-off-true.log](https://github.com/apache/hudi/files/11028887/hoodie-allow-consecutive-off-true.log)
   [my-transactional-topic-single-partition.log](https://github.com/apache/hudi/files/11028888/my-transactional-topic-single-partition.log)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8258: [SUPPORT] Deltastreamer errors ingesting kafka-streams topics (transactional / exactly_once producers)

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8258:
URL: https://github.com/apache/hudi/issues/8258#issuecomment-1486214301

   @WarFox 
   We can either use async compaction enable or offline compaction job. 
   With streaming, we normally don't use inline compaction job as that will increase the latency.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [SUPPORT] Deltastreamer errors ingesting kafka-streams topics (transactional / exactly_once producers) [hudi]

Posted by "danielfordfc (via GitHub)" <gi...@apache.org>.
danielfordfc commented on issue #8258:
URL: https://github.com/apache/hudi/issues/8258#issuecomment-2047013315

   @ad1happy2go @nsivabalan is this absolutely not going to happen anytime soon? It's preventing us from directly ingesting a large majority of our Kafka topics in our organisation and i'm very surprised it's not a more widely experienced issue, given its a common feature of topics produced through kafka-streams applications


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] WarFox commented on issue #8258: [SUPPORT] Deltastreamer errors ingesting kafka-streams topics (transactional / exactly_once producers)

Posted by "WarFox (via GitHub)" <gi...@apache.org>.
WarFox commented on issue #8258:
URL: https://github.com/apache/hudi/issues/8258#issuecomment-1484047309

   Thanks @ad1happy2go 
   
   We are also leaning towards using either Spark Structured streaming or Batch processing as a workaround. We have some working examples for both cases now, thanks again for sharing your example code too
   
   Could you confirm if we need to have a separate process for compaction when using Spark Structured streaming for writing to Hudi?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [SUPPORT] Deltastreamer errors ingesting kafka-streams topics (transactional / exactly_once producers) [hudi]

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8258:
URL: https://github.com/apache/hudi/issues/8258#issuecomment-2049456472

   Currently I didn't had any viable solution to fix this. Not sure if anybody is looking into it currently. So it is not going to happen anytime soon. 
   @bvaradar Do you have any insights how can we handle transactional topics? PR - https://github.com/apache/hudi/pull/9059


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] WarFox commented on issue #8258: [SUPPORT] Deltastreamer errors ingesting kafka-streams topics (transactional / exactly_once producers)

Posted by "WarFox (via GitHub)" <gi...@apache.org>.
WarFox commented on issue #8258:
URL: https://github.com/apache/hudi/issues/8258#issuecomment-1481350315

   More observations on this
   
   - It is observed that the problem occurs when transaction marker is at the last offset for consumption
   - The ingestion is successful when we have non-transactional messages after the transaction marker offset
   
   As seen in the logs attached above, we suspect that the underlying Spark KafkaRDD library is confused about the transaction marker when it is the last record in the topic


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8258: [SUPPORT] Deltastreamer errors ingesting kafka-streams topics (transactional / exactly_once producers)

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8258:
URL: https://github.com/apache/hudi/issues/8258#issuecomment-1484045330

   I am able to reproduce your issue with HoodieDeltaStreamer. We are looking into this issue. 
   
   meanwhile you can use spark structured streaming to read the data from transactional topic and write to Hudi. I am not seeing any error while using the structured streaming. Below is the code you can refer.
   
   `import io.confluent.kafka.schemaregistry.client.rest.RestService
   import io.confluent.kafka.serializers.KafkaAvroDeserializer
   import org.apache.avro.Schema
   
   
   val schemaRegistryURL = "http://localhost:8081"
   
   val topicName = "issue_8258_avro"
   val subjectValueName = topicName + "-value"
   
   //create RestService object
   val restService = new RestService(schemaRegistryURL)
   
   val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
   
   //Use Avro parsing classes to get Avro Schema
   val parser = new Schema.Parser
   val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)
   
   //key schema is typically just string but you can do the same process for the key as the value
   val keySchemaString = "\"string\""
   val keySchema = parser.parse(keySchemaString)
   
   //Create a map with the Schema registry url.
   //This is the only Required configuration for Confluent's KafkaAvroDeserializer.
   val props = Map("schema.registry.url" -> schemaRegistryURL)
   
   //Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
   var keyDeserializer: KafkaAvroDeserializer = null
   var valueDeserializer: KafkaAvroDeserializer = null
   
   //Create structured streaming DF to read from the topic.
   val rawTopicMessageDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", topicName).option("startingOffsets", "earliest").option("maxOffsetsPerTrigger", 20).load()
   
   
   import org.apache.hudi.QuickstartUtils._
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   import org.apache.hudi.common.model.HoodieRecord
   
   
   import org.apache.spark.sql.streaming.Trigger
   
     rawTopicMessageDF.writeStream.format("hudi").
       options(getQuickstartWriteConfigs).
       option(PRECOMBINE_FIELD_OPT_KEY, "timestamp").
       option(RECORDKEY_FIELD_OPT_KEY, "offset").
       option(TABLE_NAME, topicName).
       outputMode("append").
       option("path", "file:///tmp/" + topicName).
       option("checkpointLocation", "file:///tmp/ck_" + topicName).
       trigger(Trigger.Once()).
       start()
   
       spark.read.format("hudi").load("file:///tmp/" + topicName).count()
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8258: [SUPPORT] Deltastreamer errors ingesting kafka-streams topics (transactional / exactly_once producers)

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8258:
URL: https://github.com/apache/hudi/issues/8258#issuecomment-1569855643

   @danielfordfc JIRA created for the tracking the fix - https://issues.apache.org/jira/browse/HUDI-6297


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org