You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Raymond Xu (Jira)" <ji...@apache.org> on 2022/03/29 18:08:00 UTC

[jira] (HUDI-1864) Support for java.time.LocalDate in TimestampBasedAvroKeyGenerator

    [ https://issues.apache.org/jira/browse/HUDI-1864 ]


    Raymond Xu deleted comment on HUDI-1864:
    ----------------------------------

was (Author: githubbot):
vaibhav-sinha commented on pull request #2923:
URL: https://github.com/apache/hudi/pull/2923#issuecomment-872778115


   @n3nash Essentially we did two things in this PR. We changed the representation of `timestamp` based LogicalType and added the support for Data/Timestamp based objects in `TmestampBasedAvroKeyGenerator`. Both of these are independent of each other.
   
   Hence, we can continue using `long` to represent `timestamp` if the memory footprint is a concern. In that case, I would suggest we should represent `Date` as `integer` just to be consistent with representations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Support for java.time.LocalDate in TimestampBasedAvroKeyGenerator
> -----------------------------------------------------------------
>
>                 Key: HUDI-1864
>                 URL: https://issues.apache.org/jira/browse/HUDI-1864
>             Project: Apache Hudi
>          Issue Type: Improvement
>            Reporter: Vaibhav Sinha
>            Assignee: sivabalan narayanan
>            Priority: Major
>              Labels: pull-request-available, query-eng, sev:high
>             Fix For: 0.12.0
>
>
> When we read data from MySQL which has a column of type {{Date}}, Spark represents it as an instance of {{java.time.LocalDate}}. If I try and use this column for partitioning while doing a write to Hudi, I get the following exception
>  
> {code:java}
> Caused by: org.apache.hudi.exception.HoodieKeyGeneratorException: Unable to parse input partition field :2021-04-21
> 	at org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:136) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at org.apache.hudi.keygen.CustomAvroKeyGenerator.getPartitionPath(CustomAvroKeyGenerator.java:89) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at org.apache.hudi.keygen.CustomKeyGenerator.getPartitionPath(CustomKeyGenerator.java:64) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$2(HoodieSparkSqlWriter.scala:160) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.Iterator.foreach(Iterator.scala:941) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.Iterator.foreach$(Iterator.scala:941) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.AbstractIterator.to(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.AbstractIterator.toArray(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
> 	at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.scheduler.Task.run(Task.scala:131) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_171]
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_171]
> 	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_171]
> Caused by: org.apache.hudi.exception.HoodieNotSupportedException: Unexpected type for partition field: java.time.LocalDate
> 	at org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:208) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:134) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at org.apache.hudi.keygen.CustomAvroKeyGenerator.getPartitionPath(CustomAvroKeyGenerator.java:89) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at org.apache.hudi.keygen.CustomKeyGenerator.getPartitionPath(CustomKeyGenerator.java:64) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$2(HoodieSparkSqlWriter.scala:160) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.Iterator.foreach(Iterator.scala:941) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.Iterator.foreach$(Iterator.scala:941) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.AbstractIterator.to(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.AbstractIterator.toArray(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
> 	at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.scheduler.Task.run(Task.scala:131) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_171]
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_171]
> 	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_171]
> {code}
> Currently, the only supported column types are
> {code:java}
> public String getPartitionPath(Object partitionVal) {
>     initIfNeeded();
>     long timeMs;
>     if (partitionVal instanceof Double) {
>       timeMs = convertLongTimeToMillis(((Double) partitionVal).longValue());
>     } else if (partitionVal instanceof Float) {
>       timeMs = convertLongTimeToMillis(((Float) partitionVal).longValue());
>     } else if (partitionVal instanceof Long) {
>       timeMs = convertLongTimeToMillis((Long) partitionVal);
>     } else if (partitionVal instanceof CharSequence) {
>       if (!inputFormatter.isPresent()) {
>         throw new HoodieException("Missing inputformatter. Ensure " + Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
>       }
>       DateTime parsedDateTime = inputFormatter.get().parseDateTime(partitionVal.toString());
>       if (this.outputDateTimeZone == null) {
>         // Use the timezone that came off the date that was passed in, if it had one
>         partitionFormatter = partitionFormatter.withZone(parsedDateTime.getZone());
>       }      timeMs = inputFormatter.get().parseDateTime(partitionVal.toString()).getMillis();
>     } else {
>       throw new HoodieNotSupportedException(
>           "Unexpected type for partition field: " + partitionVal.getClass().getName());
>     }
>     DateTime timestamp = new DateTime(timeMs, outputDateTimeZone);
>     String partitionPath = timestamp.toString(partitionFormatter);
>     if (encodePartitionPath) {
>       try {
>         partitionPath = URLEncoder.encode(partitionPath, StandardCharsets.UTF_8.toString());
>       } catch (UnsupportedEncodingException uoe) {
>         throw new HoodieException(uoe.getMessage(), uoe);
>       }
>     }
>     return hiveStylePartitioning ? getPartitionPathFields().get(0) + "=" + partitionPath : partitionPath;
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)