You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/02/25 16:28:14 UTC

[GitHub] [hudi] t0il3ts0ap commented on issue #2589: [SUPPORT] Issue with adding column while running deltastreamer with kafka source.

t0il3ts0ap commented on issue #2589:
URL: https://github.com/apache/hudi/issues/2589#issuecomment-786029402


   Ran again on fresh table, still same issue. 
   
   SparkSubmit:
   ```
   spark-submit 
   --master yarn 
   --packages org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-utilities-bundle_2.12:0.7.0 
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
   --conf spark.scheduler.mode=FAIR 
   --conf spark.task.maxFailures=5 
   --conf spark.rdd.compress=true 
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
   --conf spark.shuffle.service.enabled=true 
   --conf spark.sql.hive.convertMetastoreParquet=false 
   --conf spark.executor.heartbeatInterval=120s 
   --conf spark.network.timeout=600s 
   --conf spark.yarn.max.executor.failures=5 
   --conf spark.sql.catalogImplementation=hive 
   --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=/home/hadoop/log4j-spark.properties" 
   --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=/home/hadoop/log4j-spark.properties" 
   --deploy-mode client s3://navi-emr-poc/delta-streamer-test/jars/deltastreamer-addons-1.0-SNAPSHOT.jar 
   --enable-sync 
   --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor 
   --hoodie-conf hoodie.parquet.compression.codec=snappy 
   --hoodie-conf partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor 
   --hoodie-conf hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false 
   --hoodie-conf auto.offset.reset=latest 
   --hoodie-conf hoodie.avro.schema.validate=true
   --table-type MERGE_ON_READ 
   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource 
   --schemaprovider-class org.apache.hudi.utilities.schema.NullTargetSchemaRegistryProvider  
   --props s3://navi-emr-poc/delta-streamer-test/config/kafka-source-nonprod.properties 
   --transformer-class com.navi.transform.DebeziumTransformer 
   --continuous 
   --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=https://dev-dataplatform-schema-registry.np.navi-tech.in/subjects/to_be_deleted_service.public.accounts-value/versions/latest 
   --hoodie-conf hoodie.datasource.hive_sync.database=to_be_deleted_service 
   --hoodie-conf hoodie.datasource.hive_sync.table=accounts 
   --hoodie-conf hoodie.datasource.write.recordkey.field=id 
   --hoodie-conf hoodie.datasource.write.precombine.field=__lsn 
   --hoodie-conf hoodie.datasource.write.partitionpath.field='' 
   --hoodie-conf hoodie.deltastreamer.source.kafka.topic=to_be_deleted_service.public.accounts 
   --hoodie-conf group.id=delta-streamer-to_be_deleted_service-accounts 
   --source-ordering-field __lsn 
   --target-base-path s3://navi-emr-poc/raw-data/to_be_deleted_service/accounts 
   --target-table accounts
   ```
   
   Transformer Code:
   ```
   public class DebeziumTransformer implements Transformer {
   
       public Dataset<Row> apply(JavaSparkContext javaSparkContext, SparkSession sparkSession,
           Dataset<Row> dataset, TypedProperties typedProperties) {
   
           Dataset<Row> transformedDataset = dataset
               .withColumn("__deleted", dataset.col("__deleted").cast(DataTypes.BooleanType))
               .withColumnRenamed("__deleted", "_hoodie_is_deleted")
               .drop("__op", "__source_ts_ms");
   
           log.info("TRANSFORMER SCHEMA STARTS");
           transformedDataset.printSchema();
           transformedDataset.show();
           log.info("TRANSFORMER SCHEMA ENDS");
           return transformedDataset;
       }
   }
   ```
   
   When I add the column, debezium updates the schema registry instantaneously and new records start flowing. Its possible that deltastreamer gets the new schema records before even hitting schema registry.
   
   Attaching logs:
   [alog.txt](https://github.com/apache/hudi/files/6044367/alog.txt)
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org