You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/10 13:24:08 UTC

[GitHub] [hudi] yesemsanthoshkumar opened a new issue #4784: [SUPPORT] Partition column not appearing in spark dataframe

yesemsanthoshkumar opened a new issue #4784:
URL: https://github.com/apache/hudi/issues/4784


   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? Yes
   
   - Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.
   
   - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   I have a dataset that has a schema that looks like
   ```
   {
    "created_at": "2022-01-30T00:24:14Z",
    "city_id": "1",
    "__source_ts_ms": 123456789
    "other columns"....
   }
   ```
   This is created by Debezium CDC with NewRecordStateExtract Transform. I ran the Deltastreamer in continuous mode with CustomKeyGenerator. I could see the parquet files created with partitions specified. But when reading the hudi datasource via spark, I couldn't find the partition column in the dataframe when reading from hudi spark.
   ```
   spark-submit \
     --deploy-mode cluster \
     --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
     --conf "spark.sql.hive.convertMetastoreParquet=false" \
     --jars "/usr/lib/hudi/hudi-utilities-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar" \
     --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
     /usr/lib/hudi/hudi-utilities-bundle.jar \
     --table-type MERGE_ON_READ \
     --target-base-path <<s3://bucket/tablenameprefix>> \
     --target-table <<tablename>> \
     --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
     --source-ordering-field __source_ts_ms  \
     --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
     --op UPSERT \
     --hoodie-conf "group.id=<<consumergroupid>>" \
     --hoodie-conf "hoodie.deltastreamer.source.kafka.topic=<<kafka-topic-name>>" \
     --hoodie-conf "hoodie.embed.timeline.server=true" \
     --hoodie-conf "hoodie.compact.inline=false" \
     --hoodie-conf "hoodie.datasource.write.drop.partition.columns=false" \
     --hoodie-conf "hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator" \
     --hoodie-conf "hoodie.datasource.write.recordkey.field=id" \
     --hoodie-conf "hoodie.datasource.write.partitionpath.field=created_at:timestamp,city_id:simple" \
     --hoodie-conf "hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING" \
     --hoodie-conf "hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd'T'HH:mm:ssZ" \
     --hoodie-conf "hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy-MM-dd" \
     --hoodie-conf "hoodie.deltastreamer.keygen.timebased.timezone=GMT+5:30" \
     --hoodie-conf "hoodie.datasource.write.hive_style_partitioning=true" \
     --hoodie-conf "hoodie.deltastreamer.schemaprovider.registry.url=https://myschemaregistry/mysubject/versions/1" \
     --hoodie-conf "hoodie.deltastreamer.source.dfs.root=<<s3://bucket/anotherprefix>>" \
     --hoodie-conf "auto.offset.reset=earliest" \
     --hoodie-conf "hoodie.upsert.shuffle.parallelism=2" \
     --hoodie-conf "hoodie.insert.shuffle.parallelism=2" \
     --hoodie-conf "hoodie.delete.shuffle.parallelism=2" \
     --hoodie-conf "hoodie.bulkinsert.shuffle.parallelism=2" \
     --hoodie-conf "schema.registry.url=https://myschemaregistry/" \
     --hoodie-conf "bootstrap.servers=mykafkabroker1:9092" \
     --continuous
   ```
   
   A clear and concise description of the problem.
   
   I tried to read the dataset by the following
   ```
   spark.read.format("org.apache.hudi").load("s3path")
   ```
   gives me
   ```
   id     | created_at                       | city_id | _hoodie_partition_path.   |  _hoodie_record_key
   123  | 2022-01-30T00:24:14Z | 1          | created_at=2021-10-30   |    123
   ```
   What I expected
   ```
   id    | created_at                       | city_id | _hoodie_partition_path                  | _hoodie_record_key | created_at
   123 | 2022-01-30T00:24:14Z | 1          | created_at=2021-10-30/city_id=1 | 123                              | 2021-10-30
   ```
   Maybe not the same created_at but something like **_created_at**
   
   ```
   spark
      .read
      .option("basepath", "s3://path")
      .parquet("s3://path/created_at=2022-01*/")
   ```
   and
   ```
   spark
      .read
      .option("basepath", "s3://path")
      .parquet("s3://path/created_at=2022-01*/city_id=1/")
   ```
   gives me
   
   ```
   id  | created_at    | city_id |  _hoodie_partition_path                     |  _hoodie_record_key
   1   | 2022-01-30  | 1          |  created_at=2022-01-30/hub_id=5   |  1
   ```
   Notice now there is no timestamp data in created_at
   
   My questions are as follows:
   1. Is there a way to get both partition and actual column in the dataframe? In this case, I'd like to get the date value and timestamp value in two columns.
   2. Or rename the partition column in hudi? My partition looks like created_at=yyyy-MM-dd/city_id=1/  Something like dt=yyyy-MM-dd/city_id=1/
   3. If this is an expected behaviour, how do I apply filtering with just dates on created_at?
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Create a mysql table with the following DDL
   
   ```
   CREATE TABLE `orders` (
     `id` int(11) NOT NULL AUTO_INCREMENT,
     `city_id` varchar(5) COLLATE utf8_unicode_ci NOT NULL,
     `created_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
     `updated_at` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
     PRIMARY KEY (`id`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci
   ```
   
   2. Create a debezium connector with the following configuration
   ```
   {
   	"connector.class": "io.debezium.connector.mysql.MySqlConnector",
   	"transforms.unwrap.delete.handling.mode": "rewrite",
   	"tasks.max": "1",
   	"database.history.kafka.topic": "hudi-history",
   	"transforms": "unwrap",
   	"tombstones.on.delete": "false",
   	"snapshot.new.tables": "parallel",
   	"database.history.skip.unparseable.ddl": "true",
   	"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
   	"value.converter": "io.confluent.connect.avro.AvroConverter",
   	"key.converter": "io.confluent.connect.avro.AvroConverter",
   	"database.user": "username",
   	"database.history.kafka.bootstrap.servers": "kafkabrokerhost:port",
   	"database.server.name": "hudi-dev",
   	"heartbeat.interval.ms": "30000",
   	"database.port": "3306",
   	"key.converter.schemas.enable": "true",
   	"value.converter.schema.registry.url": "schemaregistryhost:port",
   	"database.hostname": "dbhostnameOrIP",
   	"database.password": "password",
   	"value.converter.schemas.enable": "true",
   	"name": "hudi-dev2",
   	"transforms.unwrap.add.fields": "op,db,table,source.ts_ms,ts_ms",
   	"table.include.list": "mydb.orders",
   	"key.converter.schema.registry.url": "schemaregistryhost:port",
   	"snapshot.mode": "initial"
   }
   ```
   
   3. Run hudi delta streamer with the above configurations
   4. Read the dataset with the above methods suggested
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   I expected the partition columns to be present inside the dataframe as well so that I can apply filtering.
   
   **Environment Description**
   
   * Hudi version : 0.7.0-amzn-0
   
   * Spark version : 3.1.1-amzn-0.1
   
   * Hive version : 3.1.2-amzn-4
   
   * Hadoop version : 3.2.1
   
   * Storage (HDFS/S3/GCS..) : s3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #4784: [SUPPORT] Partition column not appearing in spark dataframe

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #4784:
URL: https://github.com/apache/hudi/issues/4784#issuecomment-1036330505


   I could not reproduce the partitioning issue you are facing.
   
   local spark shell
   ```
   
   import java.sql.Timestamp
   import spark.implicits._
   
   import org.apache.hudi.QuickstartUtils._
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   
   
   val df1 = Seq(
           ("row1", 1, "part1" ,1578283932000L ),
           ("row2", 1, "part1", 1578283942000L)
         ).toDF("row", "ppath", "preComb","eventTime")
   
   
    df1.write.format("hudi").
           options(getQuickstartWriteConfigs).
           option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
           option(RECORDKEY_FIELD_OPT_KEY, "row").
           option(PARTITIONPATH_FIELD_OPT_KEY, "preComb:simple,ppath:timestamp").
           option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.CustomKeyGenerator").
           option("hoodie.deltastreamer.keygen.timebased.timestamp.type","EPOCHMILLISECONDS").
           option("hoodie.deltastreamer.keygen.timebased.output.dateformat","yyyy-MM-dd").
           option("hoodie.deltastreamer.keygen.timebased.timezone","GMT+8:00").
           option(TABLE_NAME, "timestamp_tbl4").
           mode(Overwrite).
           save("/tmp/hudi_timestamp_tbl4")
   
   
   val hudiDF4 = spark.read.format("hudi").load("/tmp/hudi_timestamp_tbl4")
   hudiDF4.registerTempTable("tbl4")
   spark.sql("describe tbl4").show()
   spark.sql("select * from tbl4 limit 3").show()
   
   ```
   
   Output
   ```
   spark.sql("select * from tbl4 limit 3").show()
   +-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+-------+-----+
   |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| row|    eventTime|preComb|ppath|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+-------+-----+
   |  20220211102107283|20220211102107283...|              row1|      part1/1970-01-01|dfc23d4b-8177-4fa...|row1|1578283932000|  part1|    0|
   |  20220211102107283|20220211102107283...|              row2|      part1/1970-01-01|dfc23d4b-8177-4fa...|row2|1578283942000|  part1|    0|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+-------+-----+
   ```
   
   specifically values for _hoodie_partition_path are 
   part1/1970-01-01
   
   2: if you disable hive style partitioning, you may not see the "fieldname=". But if you want to enable it, don't think hudi allows changing the fieldname for partition paths. 
   3: I am not sure on how to leverage partition pruning for custom key gen based tables. @xushiyan @YannByron @bhasudha : do you folks have any pointers here. 
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yesemsanthoshkumar commented on issue #4784: [SUPPORT] Partition column not appearing in spark dataframe

Posted by GitBox <gi...@apache.org>.
yesemsanthoshkumar commented on issue #4784:
URL: https://github.com/apache/hudi/issues/4784#issuecomment-1034923557


   @nsivabalan FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan edited a comment on issue #4784: [SUPPORT] Partition column not appearing in spark dataframe

Posted by GitBox <gi...@apache.org>.
nsivabalan edited a comment on issue #4784:
URL: https://github.com/apache/hudi/issues/4784#issuecomment-1036330505


   I could not reproduce the partitioning issue you are facing. I could see my partition is well formed and I could see the two original columns which i used to generate the partition col as well.
   
   local spark shell
   ```
   
   import java.sql.Timestamp
   import spark.implicits._
   
   import org.apache.hudi.QuickstartUtils._
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   
   
   val df1 = Seq(
           ("row1", 1, "part1" ,1578283932000L ),
           ("row2", 1, "part1", 1578283942000L)
         ).toDF("row", "ppath", "preComb","eventTime")
   
   
    df1.write.format("hudi").
           options(getQuickstartWriteConfigs).
           option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
           option(RECORDKEY_FIELD_OPT_KEY, "row").
           option(PARTITIONPATH_FIELD_OPT_KEY, "preComb:simple,ppath:timestamp").
           option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.CustomKeyGenerator").
           option("hoodie.deltastreamer.keygen.timebased.timestamp.type","EPOCHMILLISECONDS").
           option("hoodie.deltastreamer.keygen.timebased.output.dateformat","yyyy-MM-dd").
           option("hoodie.deltastreamer.keygen.timebased.timezone","GMT+8:00").
           option(TABLE_NAME, "timestamp_tbl4").
           mode(Overwrite).
           save("/tmp/hudi_timestamp_tbl4")
   
   
   val hudiDF4 = spark.read.format("hudi").load("/tmp/hudi_timestamp_tbl4")
   hudiDF4.registerTempTable("tbl4")
   spark.sql("describe tbl4").show()
   spark.sql("select * from tbl4 limit 3").show()
   
   ```
   
   Output
   ```
   spark.sql("select * from tbl4 limit 3").show()
   +-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+-------+-----+
   |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| row|    eventTime|preComb|ppath|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+-------+-----+
   |  20220211102107283|20220211102107283...|              row1|      part1/1970-01-01|dfc23d4b-8177-4fa...|row1|1578283932000|  part1|    0|
   |  20220211102107283|20220211102107283...|              row2|      part1/1970-01-01|dfc23d4b-8177-4fa...|row2|1578283942000|  part1|    0|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+-------+-----+
   ```
   
   specifically values for _hoodie_partition_path are 
   part1/1970-01-01
   
   2: if you disable hive style partitioning, you may not see the "fieldname=". But if you want to enable it, don't think hudi allows changing the fieldname for partition paths. 
   3: I am not sure on how to leverage partition pruning for custom key gen based tables. @xushiyan @YannByron @bhasudha @codope : do you folks have any pointers here. 
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan edited a comment on issue #4784: [SUPPORT] Partition column not appearing in spark dataframe

Posted by GitBox <gi...@apache.org>.
nsivabalan edited a comment on issue #4784:
URL: https://github.com/apache/hudi/issues/4784#issuecomment-1036330505


   I could not reproduce the partitioning issue you are facing. I could see my partition is well formed and I could see the two original columns which i used to generate the partition col as well.
   
   local spark shell
   ```
   
   import java.sql.Timestamp
   import spark.implicits._
   
   import org.apache.hudi.QuickstartUtils._
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   
   
   val df1 = Seq(
           ("row1", 1, "part1" ,1578283932000L ),
           ("row2", 1, "part1", 1578283942000L)
         ).toDF("row", "ppath", "preComb","eventTime")
   
   
    df1.write.format("hudi").
           options(getQuickstartWriteConfigs).
           option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
           option(RECORDKEY_FIELD_OPT_KEY, "row").
           option(PARTITIONPATH_FIELD_OPT_KEY, "preComb:simple,ppath:timestamp").
           option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.CustomKeyGenerator").
           option("hoodie.deltastreamer.keygen.timebased.timestamp.type","EPOCHMILLISECONDS").
           option("hoodie.deltastreamer.keygen.timebased.output.dateformat","yyyy-MM-dd").
           option("hoodie.deltastreamer.keygen.timebased.timezone","GMT+8:00").
           option(TABLE_NAME, "timestamp_tbl4").
           mode(Overwrite).
           save("/tmp/hudi_timestamp_tbl4")
   
   
   val hudiDF4 = spark.read.format("hudi").load("/tmp/hudi_timestamp_tbl4")
   hudiDF4.registerTempTable("tbl4")
   spark.sql("describe tbl4").show()
   spark.sql("select * from tbl4 limit 3").show()
   
   ```
   
   Output
   ```
   spark.sql("select * from tbl4 limit 3").show()
   +-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+-------+-----+
   |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| row|    eventTime|preComb|ppath|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+-------+-----+
   |  20220211102107283|20220211102107283...|              row1|      part1/1970-01-01|dfc23d4b-8177-4fa...|row1|1578283932000|  part1|    0|
   |  20220211102107283|20220211102107283...|              row2|      part1/1970-01-01|dfc23d4b-8177-4fa...|row2|1578283942000|  part1|    0|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+-------+-----+
   ```
   
   specifically values for _hoodie_partition_path are 
   part1/1970-01-01
   
   2: if you disable hive style partitioning, you may not see the "fieldname=". But if you want to enable it, don't think hudi allows changing the fieldname for partition paths. 
   3: I am not sure on how to leverage partition pruning for custom key gen based tables. @xushiyan @YannByron @bhasudha : do you folks have any pointers here. 
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] poolis commented on issue #4784: [SUPPORT] Partition column not appearing in spark dataframe

Posted by GitBox <gi...@apache.org>.
poolis commented on issue #4784:
URL: https://github.com/apache/hudi/issues/4784#issuecomment-1040811171


   @nsivabalan [nsivabalan](https://github.com/nsivabalan). In your example, why is the column `ppath` = `0` in the output when it was originally `1` when the DataFrame was created?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org