You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Bharat Dighe (Jira)" <ji...@apache.org> on 2020/10/11 19:56:00 UTC
[jira] [Created] (HUDI-1340) Not able to query real time table when
rows contains nested elements
Bharat Dighe created HUDI-1340:
----------------------------------
Summary: Not able to query real time table when rows contains nested elements
Key: HUDI-1340
URL: https://issues.apache.org/jira/browse/HUDI-1340
Project: Apache Hudi
Issue Type: Bug
Reporter: Bharat Dighe
Attachments: create_avro.py, user.avsc, users1.avro, users2.avro, users3.avro, users4.avro, users5.avro
AVRO schema: Attached
Script to generate sample data: attached
Sample data attached
==
the schema as nested elements, here is the output from hive
{code:java}
CREATE EXTERNAL TABLE `users_mor_rt`(
`_hoodie_commit_time` string,
`_hoodie_commit_seqno` string,
`_hoodie_record_key` string,
`_hoodie_partition_path` string,
`_hoodie_file_name` string,
`name` string,
`userid` int,
`datehired` string,
`meta` struct<src:string,ingestTime:bigint>,
`experience` struct<desc:string,companies:array<struct<name:string,years:int>>>)
PARTITIONED BY (
`role` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs://namenode:8020/tmp/hudi_repair_order_mor'
TBLPROPERTIES (
'last_commit_time_sync'='20201011190954',
'transient_lastDdlTime'='1602442906')
{code}
scala code:
{code:java}
import java.io.File
import org.apache.hudi.QuickstartUtils._
import org.apache.spark.sql.SaveMode._
import org.apache.avro.Schema
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "users_mor"
// val basePath = "hdfs:///tmp/hudi_repair_order_mor"
val basePath = "hdfs:///tmp/hudi_repair_order_mor"
// Insert Data
/// local not hdfs !!!
//val schema = new Schema.Parser().parse(new File("/var/hoodie/ws/docker/demo/data/user/user.avsc"))
def updateHudi( num:String, op:String) = {
val path = "hdfs:///var/demo/data/user/users" + num + ".avro"
println( path );
val avdf2 = new org.apache.spark.sql.SQLContext(sc).read.format("avro").
// option("avroSchema", schema.toString).
load(path)
avdf2.select("name").show(false)
avdf2.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,op).
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). // default:COPY_ON_WRITE, MERGE_ON_READ
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator").
option(PRECOMBINE_FIELD_OPT_KEY, "meta.ingestTime"). // dedup
option(RECORDKEY_FIELD_OPT_KEY, "userId"). // key
option(PARTITIONPATH_FIELD_OPT_KEY, "role").
option(TABLE_NAME, tableName).
option("hoodie.compact.inline", false).
option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
option(HIVE_SYNC_ENABLED_OPT_KEY, "true").
option(HIVE_TABLE_OPT_KEY, tableName).
option(HIVE_USER_OPT_KEY, "hive").
option(HIVE_PASS_OPT_KEY, "hive").
option(HIVE_URL_OPT_KEY, "jdbc:hive2://hiveserver:10000").
option(HIVE_PARTITION_FIELDS_OPT_KEY, "role").
option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor").
option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").
mode(Append).
save(basePath)
spark.sql("select name, _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, experience.companies[0] from " + tableName + "_rt").show()
spark.sql("select name, _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, _hoodie_commit_seqno from " + tableName + "_ro").show()
}
updateHudi("1", "bulkinsert")
updateHudi("2", "upsert")
updateHudi("3", "upsert")
updateHudi("4", "upsert")
{code}
If nested fields are not included, it works fine
{code}
scala> spark.sql("select name from users_mor_rt");
res19: org.apache.spark.sql.DataFrame = [name: string]
scala> spark.sql("select name from users_mor_rt").show();
+---------+
| name|
+---------+
| engg3|
|engg1_new|
|engg2_new|
| mgr1|
| mgr2|
| devops1|
| devops2|
+---------+
{code}
But fails when I include nested field 'experience'
{code}
scala> spark.sql("select name, experience from users_mor_rt").show();
20/10/11 19:53:58 ERROR executor.Executor: Exception in task 0.0 in stage 147.0 (TID 153)
java.lang.UnsupportedOperationException: Cannot inspect org.apache.hadoop.io.Text
at org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector.getStructFieldData(ArrayWritableObjectInspector.java:152)
at org.apache.spark.sql.hive.HiveInspectors$$anonfun$4$$anonfun$apply$7.apply(HiveInspectors.scala:688)
at org.apache.spark.sql.hive.HiveInspectors$$anonfun$unwrapperFor$41$$anonfun$apply$8.apply(HiveInspectors.scala:692)
at org.apache.spark.sql.hive.HiveInspectors$$anonfun$unwrapperFor$41$$anonfun$apply$8.apply(HiveInspectors.scala:692)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
{code}
Using hive I can query data with no issues
--
This message was sent by Atlassian Jira
(v8.3.4#803005)