You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Bharat Dighe (Jira)" <ji...@apache.org> on 2020/10/22 22:57:00 UTC

[jira] [Closed] (HUDI-1340) Not able to query real time table when rows contains nested elements

     [ https://issues.apache.org/jira/browse/HUDI-1340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Bharat Dighe closed HUDI-1340.
------------------------------
    Resolution: Not A Problem

> Not able to query real time table when rows contains nested elements
> --------------------------------------------------------------------
>
>                 Key: HUDI-1340
>                 URL: https://issues.apache.org/jira/browse/HUDI-1340
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Bharat Dighe
>            Priority: Major
>         Attachments: create_avro.py, user.avsc, users1.avro, users2.avro, users3.avro, users4.avro, users5.avro
>
>
> AVRO schema: Attached
> Script to generate sample data: attached
> Sample data attached
> ==
> the schema as nested elements, here is the output from hive
> {code:java}
>   CREATE EXTERNAL TABLE `users_mor_rt`( 
>  `_hoodie_commit_time` string, 
>  `_hoodie_commit_seqno` string, 
>  `_hoodie_record_key` string, 
>  `_hoodie_partition_path` string, 
>  `_hoodie_file_name` string, 
>  `name` string, 
>  `userid` int, 
>  `datehired` string, 
>  `meta` struct<src:string,ingestTime:bigint>, 
>  `experience` struct<desc:string,companies:array<struct<name:string,years:int>>>) 
>  PARTITIONED BY ( 
>  `role` string) 
>  ROW FORMAT SERDE 
>  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
>  STORED AS INPUTFORMAT 
>  'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' 
>  OUTPUTFORMAT 
>  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
>  LOCATION 
>  'hdfs://namenode:8020/tmp/hudi_repair_order_mor' 
>  TBLPROPERTIES ( 
>  'last_commit_time_sync'='20201011190954', 
>  'transient_lastDdlTime'='1602442906')
> {code}
> scala  code:
> {code:java}
> import java.io.File
> import org.apache.hudi.QuickstartUtils._
> import org.apache.spark.sql.SaveMode._
> import org.apache.avro.Schema
> import org.apache.hudi.DataSourceReadOptions._
> import org.apache.hudi.DataSourceWriteOptions._
> import org.apache.hudi.config.HoodieWriteConfig._
> val tableName = "users_mor"
> //  val basePath = "hdfs:///tmp/hudi_repair_order_mor"
> val basePath = "hdfs:///tmp/hudi_repair_order_mor"
>     //  Insert Data
> /// local not hdfs !!!
> //val schema = new Schema.Parser().parse(new File("/var/hoodie/ws/docker/demo/data/user/user.avsc"))
> def updateHudi( num:String, op:String) = {
>     val path = "hdfs:///var/demo/data/user/users" + num + ".avro"
>     println( path );
>     val avdf2 =  new org.apache.spark.sql.SQLContext(sc).read.format("avro").
>                 // option("avroSchema", schema.toString).
>                 load(path)
>     avdf2.select("name").show(false)
>     avdf2.write.format("hudi").
>         options(getQuickstartWriteConfigs).
>         option(OPERATION_OPT_KEY,op).
>         option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). // default:COPY_ON_WRITE, MERGE_ON_READ
>         option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator").
>         option(PRECOMBINE_FIELD_OPT_KEY, "meta.ingestTime").   // dedup
>         option(RECORDKEY_FIELD_OPT_KEY, "userId").           // key
>         option(PARTITIONPATH_FIELD_OPT_KEY, "role").
>         option(TABLE_NAME, tableName).
>         option("hoodie.compact.inline", false).
>         option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
>         option(HIVE_SYNC_ENABLED_OPT_KEY, "true").
>         option(HIVE_TABLE_OPT_KEY, tableName).
>         option(HIVE_USER_OPT_KEY, "hive").
>         option(HIVE_PASS_OPT_KEY, "hive").
>         option(HIVE_URL_OPT_KEY, "jdbc:hive2://hiveserver:10000").
>         option(HIVE_PARTITION_FIELDS_OPT_KEY, "role").
>         option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor").
>         option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").
>         mode(Append).
>         save(basePath)
>         spark.sql("select name, _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, experience.companies[0] from " + tableName + "_rt").show()
>         spark.sql("select name, _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, _hoodie_commit_seqno from " + tableName + "_ro").show()
> }
>     updateHudi("1", "bulkinsert")
>     updateHudi("2", "upsert")
>     updateHudi("3", "upsert")
>     updateHudi("4", "upsert")
> {code}
> If nested fields are not included, it works fine
> {code}
> scala> spark.sql("select name from users_mor_rt");
> res19: org.apache.spark.sql.DataFrame = [name: string]
> scala> spark.sql("select name from users_mor_rt").show();
> +---------+
> |     name|
> +---------+
> |    engg3|
> |engg1_new|
> |engg2_new|
> |     mgr1|
> |     mgr2|
> |  devops1|
> |  devops2|
> +---------+
> {code}
> But fails when I include nested field 'experience'
> {code}
> scala> spark.sql("select name, experience from users_mor_rt").show();
> 20/10/11 19:53:58 ERROR executor.Executor: Exception in task 0.0 in stage 147.0 (TID 153)
> java.lang.UnsupportedOperationException: Cannot inspect org.apache.hadoop.io.Text
> 	at org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector.getStructFieldData(ArrayWritableObjectInspector.java:152)
> 	at org.apache.spark.sql.hive.HiveInspectors$$anonfun$4$$anonfun$apply$7.apply(HiveInspectors.scala:688)
> 	at org.apache.spark.sql.hive.HiveInspectors$$anonfun$unwrapperFor$41$$anonfun$apply$8.apply(HiveInspectors.scala:692)
> 	at org.apache.spark.sql.hive.HiveInspectors$$anonfun$unwrapperFor$41$$anonfun$apply$8.apply(HiveInspectors.scala:692)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> {code}
> Using hive I can query data with no issues



--
This message was sent by Atlassian Jira
(v8.3.4#803005)