You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Bharat Dighe (Jira)" <ji...@apache.org> on 2020/10/11 19:56:00 UTC

[jira] [Created] (HUDI-1340) Not able to query real time table when rows contains nested elements

Bharat Dighe created HUDI-1340:
----------------------------------

             Summary: Not able to query real time table when rows contains nested elements
                 Key: HUDI-1340
                 URL: https://issues.apache.org/jira/browse/HUDI-1340
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: Bharat Dighe
         Attachments: create_avro.py, user.avsc, users1.avro, users2.avro, users3.avro, users4.avro, users5.avro

AVRO schema: Attached

Script to generate sample data: attached

Sample data attached

==

the schema as nested elements, here is the output from hive
{code:java}
  CREATE EXTERNAL TABLE `users_mor_rt`( 
 `_hoodie_commit_time` string, 
 `_hoodie_commit_seqno` string, 
 `_hoodie_record_key` string, 
 `_hoodie_partition_path` string, 
 `_hoodie_file_name` string, 
 `name` string, 
 `userid` int, 
 `datehired` string, 
 `meta` struct<src:string,ingestTime:bigint>, 
 `experience` struct<desc:string,companies:array<struct<name:string,years:int>>>) 
 PARTITIONED BY ( 
 `role` string) 
 ROW FORMAT SERDE 
 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
 STORED AS INPUTFORMAT 
 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' 
 OUTPUTFORMAT 
 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
 LOCATION 
 'hdfs://namenode:8020/tmp/hudi_repair_order_mor' 
 TBLPROPERTIES ( 
 'last_commit_time_sync'='20201011190954', 
 'transient_lastDdlTime'='1602442906')
{code}
scala  code:
{code:java}
import java.io.File

import org.apache.hudi.QuickstartUtils._
import org.apache.spark.sql.SaveMode._
import org.apache.avro.Schema
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._


val tableName = "users_mor"
//  val basePath = "hdfs:///tmp/hudi_repair_order_mor"
val basePath = "hdfs:///tmp/hudi_repair_order_mor"

    //  Insert Data

/// local not hdfs !!!
//val schema = new Schema.Parser().parse(new File("/var/hoodie/ws/docker/demo/data/user/user.avsc"))


def updateHudi( num:String, op:String) = {
    val path = "hdfs:///var/demo/data/user/users" + num + ".avro"
    println( path );

    val avdf2 =  new org.apache.spark.sql.SQLContext(sc).read.format("avro").
                // option("avroSchema", schema.toString).
                load(path)
    avdf2.select("name").show(false)

    avdf2.write.format("hudi").
        options(getQuickstartWriteConfigs).
        option(OPERATION_OPT_KEY,op).
        option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). // default:COPY_ON_WRITE, MERGE_ON_READ

        option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator").
        option(PRECOMBINE_FIELD_OPT_KEY, "meta.ingestTime").   // dedup
        option(RECORDKEY_FIELD_OPT_KEY, "userId").           // key
        option(PARTITIONPATH_FIELD_OPT_KEY, "role").
        option(TABLE_NAME, tableName).
        option("hoodie.compact.inline", false).

        option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
        option(HIVE_SYNC_ENABLED_OPT_KEY, "true").
        option(HIVE_TABLE_OPT_KEY, tableName).
        option(HIVE_USER_OPT_KEY, "hive").
        option(HIVE_PASS_OPT_KEY, "hive").
        option(HIVE_URL_OPT_KEY, "jdbc:hive2://hiveserver:10000").
        option(HIVE_PARTITION_FIELDS_OPT_KEY, "role").
        option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor").
        option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").
        mode(Append).
        save(basePath)

        spark.sql("select name, _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, experience.companies[0] from " + tableName + "_rt").show()
        spark.sql("select name, _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, _hoodie_commit_seqno from " + tableName + "_ro").show()
}


    updateHudi("1", "bulkinsert")
    updateHudi("2", "upsert")
    updateHudi("3", "upsert")
    updateHudi("4", "upsert")
{code}

If nested fields are not included, it works fine
{code}
scala> spark.sql("select name from users_mor_rt");
res19: org.apache.spark.sql.DataFrame = [name: string]

scala> spark.sql("select name from users_mor_rt").show();
+---------+
|     name|
+---------+
|    engg3|
|engg1_new|
|engg2_new|
|     mgr1|
|     mgr2|
|  devops1|
|  devops2|
+---------+
{code}

But fails when I include nested field 'experience'
{code}
scala> spark.sql("select name, experience from users_mor_rt").show();

20/10/11 19:53:58 ERROR executor.Executor: Exception in task 0.0 in stage 147.0 (TID 153)
java.lang.UnsupportedOperationException: Cannot inspect org.apache.hadoop.io.Text
	at org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector.getStructFieldData(ArrayWritableObjectInspector.java:152)
	at org.apache.spark.sql.hive.HiveInspectors$$anonfun$4$$anonfun$apply$7.apply(HiveInspectors.scala:688)
	at org.apache.spark.sql.hive.HiveInspectors$$anonfun$unwrapperFor$41$$anonfun$apply$8.apply(HiveInspectors.scala:692)
	at org.apache.spark.sql.hive.HiveInspectors$$anonfun$unwrapperFor$41$$anonfun$apply$8.apply(HiveInspectors.scala:692)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
{code}

Using hive I can query data with no issues



--
This message was sent by Atlassian Jira
(v8.3.4#803005)