You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/02/05 03:25:33 UTC

[GitHub] [hudi] jiangok2006 opened a new issue #2537: [SUPPORT] delta streamer sql transformer does not work for my kafka data

jiangok2006 opened a new issue #2537:
URL: https://github.com/apache/hudi/issues/2537


   Lian Jiang Jan 28th at 7:07 AM
   I found that hudi delta streamer has trouble to use transformer. The upstream data from kafka has Timestamp type instead of long type of time field. I cannot change the upstream data and deltastreamer does not support Timestamp type in TimestampBasedKeyGenerator.  So I have to use a transformer to convert the timestamp type. But a transformer makes the delta streamer fail due to error like "Commit 20210128140331 failed and rolled-back !". Even a simple sql query like below will throw that error:
   
   --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
   --hoodie-conf  hoodie.deltastreamer.transformer.sql="SELECT * FROM <SRC>" \
   
   Any idea? I am using 0.6.0. This blocks us using deltastreamer in our pipeline. Thanks for any quick response. (edited) 
   6 replies
   
   
   Sudha  7 days ago
   Hi Lian can you paste the full stack trace of that error?
   
   
   Lian Jiang  7 days ago
   Thanks @Sudha Here is the stacktrace:
   
   Exception in thread "main" org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: Commit 20210128193620 failed and rolled-back !
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:152)
   	at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:147)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:464)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
   	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
   	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
   	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
   	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
   	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
   	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
   	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: Commit 20210128193620 failed and rolled-back !
   	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
   	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
   	at org.apache.hudi.async.AbstractAsyncService.waitForShutdown(AbstractAsyncService.java:79)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:150)
   	... 15 more
   Caused by: org.apache.hudi.exception.HoodieException: Commit 20210128193620 failed and rolled-back !
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:595)
   	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.hudi.exception.HoodieException: Commit 20210128193620 failed and rolled-back !
   	at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:442)
   	at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:244)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:579)
   	... 4 more
   21/01/28 19:37:05 INFO ShutdownHookManager: Shutdown hook called
   21/01/28 19:37:05 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-e69f8874-bc3b-4d24-83b0-a3701131dcd2
   21/01/28 19:37:05 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-c25b42a0-116e-4d0e-ae05-c9528365fead
   
   Here is the command:
   Lian Jiang  7 days ago
   
   spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4,org.apache.hudi:hudi-utilities-bundle_2.11:0.6.0 \
                   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
                   https://repo1.maven.org/maven2/org/apache/hudi/hudi-utilities-bundle_2.11/0.6.0/hudi-utilities-bundle_2.11-0.6.0.jar \
                   --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
                   --props /tmp/kafka-source.properties \
                   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
                   --table-type COPY_ON_WRITE \
                   --target-base-path s3://mylocation \
                   --target-table mytable \
                   --op UPSERT \
                   --continuous \
                   --min-sync-interval-seconds 100 \
                   --source-ordering-field Metadata.processingTime \
                   --hoodie-conf  hoodie.datasource.write.recordkey.field=Metadata.id \
                   --hoodie-conf  hoodie.datasource.write.precombine.field=Metadata.processingTime \
                   --hoodie-conf  hoodie.datasource.write.partitionpath.field=processingTime2 \
                   --hoodie-conf  hoodie.datasource.hive_sync.partition_fields=processingTime2 \
                   --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor \
                   --hoodie-conf  hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator \
                   --hoodie-conf  hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy-MM-dd-HH \
                   --hoodie-conf  hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP \
                   --hoodie-conf  hoodie.datasource.write.hive_style_partitioning=true \
                   --hoodie-conf  bootstrap.servers=my_kafka_broker:6020 \
                   --hoodie-conf  sasl.mechanism=SCRAM-SHA-256 \
                   --hoodie-conf  security.protocol=SASL_SSL \
                   --hoodie-conf  sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule                 required username=\"username\" password=\"password\";" \
                   --hoodie-conf  hoodie.deltastreamer.source.kafka.topic=my_topic \
                   --hoodie-conf  schema.registry.url=https://schema-registry.net:443 \
                   --hoodie-conf  hoodie.deltastreamer.schemaprovider.registry.url=https://schema-registry.net/subjects/mysubject/versions/latest \
                   --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
                   --hoodie-conf  hoodie.deltastreamer.transformer.sql="SELECT *, cast(Metadata.processingTime as long) processingTime2 FROM <SRC>"
   
   Let me know if you need other info. Thanks. (edited) 
   
   
   Sudha  6 days ago
   Hi @Lian Jiang I was expecting to see some error specific to the transformer in the stack trace.  But not seeing anything like that. One thing I noted in you sql is you might want to rewrite like this
   
   SELECT *, cast(a.Metadata.processingTime as long) processingTime2 FROM <SRC> a
   
   Can you try that and check if that helps ?
   Lian Jiang  5 days ago
   Thanks @Sudha, unfortunately, I tried it and still got the same error. I found below error:
   
   	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
   	at org.apache.hudi.AvroConversionUtils$$anonfun$1.apply(AvroConversionUtils.scala:44)
   	at org.apache.hudi.AvroConversionUtils$$anonfun$1.apply(AvroConversionUtils.scala:44)
   	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
   	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
   	at scala.collection.Iterator$$anon$10.next(Iterator.scala:394)
   	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
   	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
   	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
   	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
   	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
   	at scala.collection.AbstractIterator.to(Iterator.scala:1334)
   	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
   	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
   	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
   	at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
   	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
   	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
   	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
   	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:123)
   	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
   	... 3 more
   Caused by: java.lang.NegativeArraySizeException
   	at org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:297)
   	at org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:1226)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.Invoke_10$(Unknown Source)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_0_1$(Unknown Source)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.If_17$(Unknown Source)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.If_18$(Unknown Source)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.CreateExternalRow_1$(Unknown Source)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.If_41$(Unknown Source)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection$NestedClass_6.createExternalRow_41_0$(Unknown Source)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection$NestedClass_13.createExternalRow_85_0$(Unknown Source)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source)
   	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:302)
   	... 28 more
   
   I can use spark-shell and hudi lib to simulate what delta streamer does (read a dataframe, use spark sql to cast processingTime to processingTime2, save to hudi) and the spark-shell works. However, delta streamer's transformer throws serDe exception when handling the kafka avro data. Both spark-shell and deltastreamer use org.apache.spark:spark-avro_2.11:2.4.4. Any idea? Thanks. (edited)
   
    
   Sudha  3 days ago
   @Lian Jiang  Both uses the same. Could you create a GH issue with this for more support. At this point, I am also out of idea, May be Gary or Balaji can help wiht this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on issue #2537: [SUPPORT] delta streamer sql transformer does not work for my kafka data

Posted by GitBox <gi...@apache.org>.
n3nash commented on issue #2537:
URL: https://github.com/apache/hudi/issues/2537#issuecomment-854307353


   Closing this issue due to no activity. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan edited a comment on issue #2537: [SUPPORT] delta streamer sql transformer does not work for my kafka data

Posted by GitBox <gi...@apache.org>.
nsivabalan edited a comment on issue #2537:
URL: https://github.com/apache/hudi/issues/2537#issuecomment-809492266


   @jiangjiguang : let us know once you have some avro. We are trying to iron out all issues related to schema and kafka issues. would be good to get this addressed as well. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan edited a comment on issue #2537: [SUPPORT] delta streamer sql transformer does not work for my kafka data

Posted by GitBox <gi...@apache.org>.
nsivabalan edited a comment on issue #2537:
URL: https://github.com/apache/hudi/issues/2537#issuecomment-809492266


   @jiangjiguang : let us know once you have some avro. We are trying to iron out all issues related to schema and kafka issues. would be good to get this addressed as well. once you respond, can you please remove "awaiting-user-response" label for the issue. If possible add "awaiting-community-help" label. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on issue #2537: [SUPPORT] delta streamer sql transformer does not work for my kafka data

Posted by GitBox <gi...@apache.org>.
bvaradar commented on issue #2537:
URL: https://github.com/apache/hudi/issues/2537#issuecomment-777182932


   @nsivabalan @jiangok2006 : Yes, the sql query format "select x,y,z from <SRC>" is correct


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] jiangok2006 commented on issue #2537: [SUPPORT] delta streamer sql transformer does not work for my kafka data

Posted by GitBox <gi...@apache.org>.
jiangok2006 commented on issue #2537:
URL: https://github.com/apache/hudi/issues/2537#issuecomment-777808336


   @nsivabalan It is not about the sql statement I think. If I use a simpler dataframe, sql transformer works fine with my or Sudha's sql query. But my kafka data seems to cause deSer issue for sql transformer. I admit that my kafka data is special because each of them is big and has complex data type (e.g. logical type). Flink 1.11 cannot handle it after two bugs are fixed in Flink 1.12.
   https://issues.apache.org/jira/browse/FLINK-19491
   https://issues.apache.org/jira/browse/FLINK-19339
   
   I guess the easier way is to provide sample avro from my kafka for trouble shooting. If so, I may need to work with my team for exposing sample data. Thought?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #2537: [SUPPORT] delta streamer sql transformer does not work for my kafka data

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #2537:
URL: https://github.com/apache/hudi/issues/2537#issuecomment-809492266


   @jiangjiguang : let us know once you have some avro. We are trying to iron our all issues related to schema and kafka issues. would be good to get this addressed as well. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash closed issue #2537: [SUPPORT] delta streamer sql transformer does not work for my kafka data

Posted by GitBox <gi...@apache.org>.
n3nash closed issue #2537:
URL: https://github.com/apache/hudi/issues/2537


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #2537: [SUPPORT] delta streamer sql transformer does not work for my kafka data

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #2537:
URL: https://github.com/apache/hudi/issues/2537#issuecomment-774124523


   @bhasudha @vinothchandar : is this the right format for sql query to be used for sql based transformer
   ```
   select x,y,z from <SRC>
   ```
   From looking at the code, that's why I could infer. Author is trying out something like "select x,y,z from a" 
   basically what's the table name to supple in this sql query? And do we have any documentation anywhere on this regard? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #2537: [SUPPORT] delta streamer sql transformer does not work for my kafka data

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #2537:
URL: https://github.com/apache/hudi/issues/2537#issuecomment-822570598


   @jiangjiguang : gentle ping. will close this out after a week if there is no activity. but def interested to understand the issue and fix if required. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on issue #2537: [SUPPORT] delta streamer sql transformer does not work for my kafka data

Posted by GitBox <gi...@apache.org>.
n3nash commented on issue #2537:
URL: https://github.com/apache/hudi/issues/2537#issuecomment-854307353


   Closing this issue due to no activity. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #2537: [SUPPORT] delta streamer sql transformer does not work for my kafka data

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #2537:
URL: https://github.com/apache/hudi/issues/2537#issuecomment-779365428


   @jiangjiguang : yes, sure, definitely would be helpful. Once you have the sample Avro, do update it here, we can take a shot at reproducing and investigating further. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] jiangok2006 edited a comment on issue #2537: [SUPPORT] delta streamer sql transformer does not work for my kafka data

Posted by GitBox <gi...@apache.org>.
jiangok2006 edited a comment on issue #2537:
URL: https://github.com/apache/hudi/issues/2537#issuecomment-777808336


   @nsivabalan It is not about the sql statement I think. If I use a simple dataframe, sql transformer works fine with my or Sudha's sql query. But my kafka data in production seems to cause deSer issue for sql transformer. I admit that my kafka data is special because each of them is big and has complex data type (e.g. logical type). Flink 1.11 cannot handle it after two bugs are fixed in Flink 1.12.
   https://issues.apache.org/jira/browse/FLINK-19491
   https://issues.apache.org/jira/browse/FLINK-19339
   
   I guess the easier way is to provide sample avro from my kafka for trouble shooting. If so, I may need to work with my team for exposing sample data. Thought?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #2537: [SUPPORT] delta streamer sql transformer does not work for my kafka data

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #2537:
URL: https://github.com/apache/hudi/issues/2537#issuecomment-774126082


   @jiangok2006 : may be in the mean time, you can try out below sql query. 
   ```
   SELECT *, cast(a.Metadata.processingTime as long) processingTime2 FROM <SRC> 
   ```
   Also, have you tried delta streamer before and you are running into this issue w/ with this sql transformer or this is the first time you are trying out deltastreamer? we will get some insights into whether we need to look at everything or just the transformer. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash closed issue #2537: [SUPPORT] delta streamer sql transformer does not work for my kafka data

Posted by GitBox <gi...@apache.org>.
n3nash closed issue #2537:
URL: https://github.com/apache/hudi/issues/2537


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org