You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/02/24 12:18:59 UTC

[GitHub] [hudi] codejoyan edited a comment on issue #2592: [SUPPORT] Does latest versions of Hudi (0.7.0, 0.6.0) work with Spark 2.3.0 when reading orc files?

codejoyan edited a comment on issue #2592:
URL: https://github.com/apache/hudi/issues/2592#issuecomment-785036461


   Sure @bvaradar . Below steps will help to reproduce. 
   **Observation:** This is failing when the dataframe has specific datatypes. If it is string fields it succeeds
   
   **Environment Details:**
   Spark version in HDP - 2.3.0.2.6.5.279-2
   Hudi: hudi-spark-bundle_2.11:0.7.0
   
   **Step-1:** Create dummy data
   
   $ hdfs dfs -cat hdfs://XXXXXw00ha/user/joyan/test_data.csv
   col_1,col_2,col_3,col_4,col_5,col_6,col_7,col_8,col_9,col_10,col_11,col_12,cntry_cd,bus_dt
   7IN00716079317820210109153408,716,3,AB,INR,107667253,0,1,2021-02-1420:23:54.753,useridjsb91,2021-02-1420:23:54.753,useridjsb91,IN,'2021-02-01'
   7IN00716079317820210109153408,716,2,AB,INR,212733302,0,1,2021-02-1420:23:54.753,useridjsb91,2021-02-1420:23:54.753,useridjsb91,IN,'2021-02-01'
   7IN00716079317820210109153408,716,1,AB,INR,139224013,0,1,2021-02-1420:23:54.753,useridjsb91,2021-02-1420:23:54.753,useridjsb91,IN,'2021-02-01'
   AU700716079381819643325112243,5700,5,AB,INR,136158881,0,1,2021-02-1420:06:38.718,useridjsb91,2021-02-1420:06:38.718,useridjsb91,IN,'2021-02-02'
   AU700716079381819643325112243,5700,6,AB,INR,585710940,4,1.97,2021-02-1420:06:38.718,useridjsb91,2021-02-1420:06:38.718,useridjsb91,IN,'2021-02-02'
   AU700716079381819643325112243,5700,4,AB,INR,136158881,0,1,2021-02-1420:06:38.718,useridjsb91,2021-02-1420:06:38.718,useridjsb91,AU,'2021-02-01'
   AU700716079381819643325112243,5700,3,AB,INR,136158881,0,1,2021-02-1420:06:38.718,useridjsb91,2021-02-1420:06:38.718,useridjsb91,AU,'2021-02-01'
   AU700716079381819643325112243,5700,1,AB,INR,136158881,0,1,2021-02-1420:06:38.718,useridjsb91,2021-02-1420:06:38.718,useridjsb91,AU,'2021-02-01'
   
   **Step-2:** Try to read the data from file, cast to appropriate datatypes and save as Hudi table
   $ spark-shell \
   --packages org.apache.hudi:hudi-spark-bundle_2.11:0.7.0,org.apache.spark:spark-avro_2.11:2.4.4,org.apache.avro:avro:1.8.2 \
   --conf spark.driver.extraClassPath=/path/org.apache.avro_avro-1.8.2.jar \
   --conf spark.executor.extraClassPath=/path/org.apache.avro_avro-1.8.2.jar \
   --conf "spark.sql.hive.convertMetastoreParquet=false" \
   --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
   
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs
   import org.apache.hudi.config.{HoodieCompactionConfig, HoodieStorageConfig, HoodieWriteConfig}
   import org.apache.spark.sql.{SaveMode, SparkSession}
   import org.apache.spark.sql.functions.{col, concat, lit}
   
   val inputDF = spark.read.format("csv").option("header", "true").load("hdfs://XXXXXw00ha/user/joyan/test_data.csv")
   
   val formattedDF = inputDF.selectExpr("col_1", "cast(col_2 as integer) col_2",
   	"cast(col_3 as short) col_3", "col_4", "col_5", "cast(col_6 as byte) col_6", "cast(col_7 as decimal(9,2)) col_7",
   	"cast(col_8 as decimal(9,2)) col_8", "cast(col_9 as timestamp) col_9", "col_10", "cast(col_11 as timestamp) col_11",
   	"col_12", "cntry_cd", "cast(bus_dt as timestamp) bus_dt")
   
   formattedDF.printSchema
   root
    |-- col_1: string (nullable = true)
    |-- col_2: integer (nullable = true)
    |-- col_3: short (nullable = true)
    |-- col_4: string (nullable = true)
    |-- col_5: string (nullable = true)
    |-- col_6: byte (nullable = true)
    |-- col_7: decimal(9,2) (nullable = true)
    |-- col_8: decimal(9,2) (nullable = true)
    |-- col_9: timestamp (nullable = true)
    |-- col_10: string (nullable = true)
    |-- col_11: timestamp (nullable = true)
    |-- col_12: string (nullable = true)
    |-- cntry_cd: string (nullable = true)
    |-- bus_dt: timestamp (nullable = true)
   
   
   formattedDF.show
   +--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----------+------+-----------+--------+------+
   |               col_1|col_2|col_3|col_4|col_5|col_6|col_7|col_8|col_9|     col_10|col_11|     col_12|cntry_cd|bus_dt|
   +--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----------+------+-----------+--------+------+
   |7IN00716079317820...|  716|    3|   AB|  INR| null| 0.00| 1.00| null|useridjsb91|  null|useridjsb91|      IN|  null|
   |7IN00716079317820...|  716|    2|   AB|  INR| null| 0.00| 1.00| null|useridjsb91|  null|useridjsb91|      IN|  null|
   |7IN00716079317820...|  716|    1|   AB|  INR| null| 0.00| 1.00| null|useridjsb91|  null|useridjsb91|      IN|  null|
   |AU700716079381819...| 5700|    5|   AB|  INR| null| 0.00| 1.00| null|useridjsb91|  null|useridjsb91|      IN|  null|
   |AU700716079381819...| 5700|    6|   AB|  INR| null| 4.00| 1.97| null|useridjsb91|  null|useridjsb91|      IN|  null|
   |AU700716079381819...| 5700|    4|   AB|  INR| null| 0.00| 1.00| null|useridjsb91|  null|useridjsb91|      AU|  null|
   |AU700716079381819...| 5700|    3|   AB|  INR| null| 0.00| 1.00| null|useridjsb91|  null|useridjsb91|      AU|  null|
   |AU700716079381819...| 5700|    1|   AB|  INR| null| 0.00| 1.00| null|useridjsb91|  null|useridjsb91|      AU|  null|
   +--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----------+------+-----------+--------+------+
   
   val transformedDF = formattedDF.withColumn("partitionpath", concat(lit("cntry_cd="), col("cntry_cd"), lit("/bus_dt="), col("bus_dt")))
   
   val targetPath = "gs://XXXXXXXXXXX7bb3d68/test_table_tgt"
   
   transformedDF.write.format("org.apache.hudi").
   options(getQuickstartWriteConfigs).
   option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col_9").
   option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "col_2,col_1,col_3").
   option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
   option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator").
   option("hoodie.upsert.shuffle.parallelism","2").
   option("hoodie.insert.shuffle.parallelism","2").
   option(HoodieWriteConfig.TABLE_NAME, "targetTableHudi").
   mode(SaveMode.Append).
   save(targetPath)
   
   java.lang.NoSuchMethodError: org.apache.spark.sql.types.Decimal$.minBytesForPrecision()[I
     at org.apache.hudi.spark.org.apache.spark.sql.avro.SchemaConverters$.toAvroType(SchemaConverters.scala:156)
     at org.apache.hudi.spark.org.apache.spark.sql.avro.SchemaConverters$$anonfun$5.apply(SchemaConverters.scala:176)
     at org.apache.hudi.spark.org.apache.spark.sql.avro.SchemaConverters$$anonfun$5.apply(SchemaConverters.scala:174)
     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
     at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
     at org.apache.hudi.spark.org.apache.spark.sql.avro.SchemaConverters$.toAvroType(SchemaConverters.scala:174)
     at org.apache.hudi.AvroConversionUtils$.convertStructTypeToAvroSchema(AvroConversionUtils.scala:52)
     at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:139)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:134)
     at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
     at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
     at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
     at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
     at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
     at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
     at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
     at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
     at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
     at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
     at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
     at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
     at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
     at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
     at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
     ... 59 elided


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org