You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/08/08 20:19:00 UTC

[jira] [Commented] (HUDI-1864) Support for java.time.LocalDate in TimestampBasedAvroKeyGenerator

    [ https://issues.apache.org/jira/browse/HUDI-1864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395606#comment-17395606 ] 

ASF GitHub Bot commented on HUDI-1864:
--------------------------------------

hudi-bot edited a comment on pull request #2923:
URL: https://github.com/apache/hudi/pull/2923#issuecomment-865487972


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1e50c60ed78027605aa0f3765eb504824e041910",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=337",
       "triggerID" : "1e50c60ed78027605aa0f3765eb504824e041910",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1e50c60ed78027605aa0f3765eb504824e041910 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=337) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Support for java.time.LocalDate in TimestampBasedAvroKeyGenerator
> -----------------------------------------------------------------
>
>                 Key: HUDI-1864
>                 URL: https://issues.apache.org/jira/browse/HUDI-1864
>             Project: Apache Hudi
>          Issue Type: Improvement
>            Reporter: Vaibhav Sinha
>            Priority: Major
>              Labels: pull-request-available, sev:high
>
> When we read data from MySQL which has a column of type {{Date}}, Spark represents it as an instance of {{java.time.LocalDate}}. If I try and use this column for partitioning while doing a write to Hudi, I get the following exception
>  
> {code:java}
> Caused by: org.apache.hudi.exception.HoodieKeyGeneratorException: Unable to parse input partition field :2021-04-21
> 	at org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:136) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at org.apache.hudi.keygen.CustomAvroKeyGenerator.getPartitionPath(CustomAvroKeyGenerator.java:89) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at org.apache.hudi.keygen.CustomKeyGenerator.getPartitionPath(CustomKeyGenerator.java:64) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$2(HoodieSparkSqlWriter.scala:160) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.Iterator.foreach(Iterator.scala:941) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.Iterator.foreach$(Iterator.scala:941) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.AbstractIterator.to(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.AbstractIterator.toArray(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
> 	at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.scheduler.Task.run(Task.scala:131) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_171]
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_171]
> 	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_171]
> Caused by: org.apache.hudi.exception.HoodieNotSupportedException: Unexpected type for partition field: java.time.LocalDate
> 	at org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:208) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:134) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at org.apache.hudi.keygen.CustomAvroKeyGenerator.getPartitionPath(CustomAvroKeyGenerator.java:89) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at org.apache.hudi.keygen.CustomKeyGenerator.getPartitionPath(CustomKeyGenerator.java:64) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$2(HoodieSparkSqlWriter.scala:160) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> 	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.Iterator.foreach(Iterator.scala:941) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.Iterator.foreach$(Iterator.scala:941) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.AbstractIterator.to(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) ~[scala-library-2.12.10.jar:?]
> 	at scala.collection.AbstractIterator.toArray(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
> 	at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.scheduler.Task.run(Task.scala:131) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) ~[spark-core_2.12-3.1.1.jar:3.1.1]
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_171]
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_171]
> 	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_171]
> {code}
> Currently, the only supported column types are
> {code:java}
> public String getPartitionPath(Object partitionVal) {
>     initIfNeeded();
>     long timeMs;
>     if (partitionVal instanceof Double) {
>       timeMs = convertLongTimeToMillis(((Double) partitionVal).longValue());
>     } else if (partitionVal instanceof Float) {
>       timeMs = convertLongTimeToMillis(((Float) partitionVal).longValue());
>     } else if (partitionVal instanceof Long) {
>       timeMs = convertLongTimeToMillis((Long) partitionVal);
>     } else if (partitionVal instanceof CharSequence) {
>       if (!inputFormatter.isPresent()) {
>         throw new HoodieException("Missing inputformatter. Ensure " + Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
>       }
>       DateTime parsedDateTime = inputFormatter.get().parseDateTime(partitionVal.toString());
>       if (this.outputDateTimeZone == null) {
>         // Use the timezone that came off the date that was passed in, if it had one
>         partitionFormatter = partitionFormatter.withZone(parsedDateTime.getZone());
>       }      timeMs = inputFormatter.get().parseDateTime(partitionVal.toString()).getMillis();
>     } else {
>       throw new HoodieNotSupportedException(
>           "Unexpected type for partition field: " + partitionVal.getClass().getName());
>     }
>     DateTime timestamp = new DateTime(timeMs, outputDateTimeZone);
>     String partitionPath = timestamp.toString(partitionFormatter);
>     if (encodePartitionPath) {
>       try {
>         partitionPath = URLEncoder.encode(partitionPath, StandardCharsets.UTF_8.toString());
>       } catch (UnsupportedEncodingException uoe) {
>         throw new HoodieException(uoe.getMessage(), uoe);
>       }
>     }
>     return hiveStylePartitioning ? getPartitionPathFields().get(0) + "=" + partitionPath : partitionPath;
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)