You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Bharat Dighe (Jira)" <ji...@apache.org> on 2020/10/22 22:57:00 UTC
[jira] [Closed] (HUDI-1340) Not able to query real time table when
rows contains nested elements
[ https://issues.apache.org/jira/browse/HUDI-1340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bharat Dighe closed HUDI-1340.
------------------------------
Resolution: Not A Problem
> Not able to query real time table when rows contains nested elements
> --------------------------------------------------------------------
>
> Key: HUDI-1340
> URL: https://issues.apache.org/jira/browse/HUDI-1340
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Bharat Dighe
> Priority: Major
> Attachments: create_avro.py, user.avsc, users1.avro, users2.avro, users3.avro, users4.avro, users5.avro
>
>
> AVRO schema: Attached
> Script to generate sample data: attached
> Sample data attached
> ==
> the schema as nested elements, here is the output from hive
> {code:java}
> CREATE EXTERNAL TABLE `users_mor_rt`(
> `_hoodie_commit_time` string,
> `_hoodie_commit_seqno` string,
> `_hoodie_record_key` string,
> `_hoodie_partition_path` string,
> `_hoodie_file_name` string,
> `name` string,
> `userid` int,
> `datehired` string,
> `meta` struct<src:string,ingestTime:bigint>,
> `experience` struct<desc:string,companies:array<struct<name:string,years:int>>>)
> PARTITIONED BY (
> `role` string)
> ROW FORMAT SERDE
> 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
> STORED AS INPUTFORMAT
> 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'
> OUTPUTFORMAT
> 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
> LOCATION
> 'hdfs://namenode:8020/tmp/hudi_repair_order_mor'
> TBLPROPERTIES (
> 'last_commit_time_sync'='20201011190954',
> 'transient_lastDdlTime'='1602442906')
> {code}
> scala code:
> {code:java}
> import java.io.File
> import org.apache.hudi.QuickstartUtils._
> import org.apache.spark.sql.SaveMode._
> import org.apache.avro.Schema
> import org.apache.hudi.DataSourceReadOptions._
> import org.apache.hudi.DataSourceWriteOptions._
> import org.apache.hudi.config.HoodieWriteConfig._
> val tableName = "users_mor"
> // val basePath = "hdfs:///tmp/hudi_repair_order_mor"
> val basePath = "hdfs:///tmp/hudi_repair_order_mor"
> // Insert Data
> /// local not hdfs !!!
> //val schema = new Schema.Parser().parse(new File("/var/hoodie/ws/docker/demo/data/user/user.avsc"))
> def updateHudi( num:String, op:String) = {
> val path = "hdfs:///var/demo/data/user/users" + num + ".avro"
> println( path );
> val avdf2 = new org.apache.spark.sql.SQLContext(sc).read.format("avro").
> // option("avroSchema", schema.toString).
> load(path)
> avdf2.select("name").show(false)
> avdf2.write.format("hudi").
> options(getQuickstartWriteConfigs).
> option(OPERATION_OPT_KEY,op).
> option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). // default:COPY_ON_WRITE, MERGE_ON_READ
> option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator").
> option(PRECOMBINE_FIELD_OPT_KEY, "meta.ingestTime"). // dedup
> option(RECORDKEY_FIELD_OPT_KEY, "userId"). // key
> option(PARTITIONPATH_FIELD_OPT_KEY, "role").
> option(TABLE_NAME, tableName).
> option("hoodie.compact.inline", false).
> option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
> option(HIVE_SYNC_ENABLED_OPT_KEY, "true").
> option(HIVE_TABLE_OPT_KEY, tableName).
> option(HIVE_USER_OPT_KEY, "hive").
> option(HIVE_PASS_OPT_KEY, "hive").
> option(HIVE_URL_OPT_KEY, "jdbc:hive2://hiveserver:10000").
> option(HIVE_PARTITION_FIELDS_OPT_KEY, "role").
> option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor").
> option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").
> mode(Append).
> save(basePath)
> spark.sql("select name, _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, experience.companies[0] from " + tableName + "_rt").show()
> spark.sql("select name, _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, _hoodie_commit_seqno from " + tableName + "_ro").show()
> }
> updateHudi("1", "bulkinsert")
> updateHudi("2", "upsert")
> updateHudi("3", "upsert")
> updateHudi("4", "upsert")
> {code}
> If nested fields are not included, it works fine
> {code}
> scala> spark.sql("select name from users_mor_rt");
> res19: org.apache.spark.sql.DataFrame = [name: string]
> scala> spark.sql("select name from users_mor_rt").show();
> +---------+
> | name|
> +---------+
> | engg3|
> |engg1_new|
> |engg2_new|
> | mgr1|
> | mgr2|
> | devops1|
> | devops2|
> +---------+
> {code}
> But fails when I include nested field 'experience'
> {code}
> scala> spark.sql("select name, experience from users_mor_rt").show();
> 20/10/11 19:53:58 ERROR executor.Executor: Exception in task 0.0 in stage 147.0 (TID 153)
> java.lang.UnsupportedOperationException: Cannot inspect org.apache.hadoop.io.Text
> at org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector.getStructFieldData(ArrayWritableObjectInspector.java:152)
> at org.apache.spark.sql.hive.HiveInspectors$$anonfun$4$$anonfun$apply$7.apply(HiveInspectors.scala:688)
> at org.apache.spark.sql.hive.HiveInspectors$$anonfun$unwrapperFor$41$$anonfun$apply$8.apply(HiveInspectors.scala:692)
> at org.apache.spark.sql.hive.HiveInspectors$$anonfun$unwrapperFor$41$$anonfun$apply$8.apply(HiveInspectors.scala:692)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> {code}
> Using hive I can query data with no issues
--
This message was sent by Atlassian Jira
(v8.3.4#803005)