You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Jing Zhang (Jira)" <ji...@apache.org> on 2023/02/13 03:15:00 UTC

[jira] [Commented] (HUDI-5770) Plan error when partition column is timestamp type and SQL query contains filter condition which contains partition

    [ https://issues.apache.org/jira/browse/HUDI-5770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17687687#comment-17687687 ] 

Jing Zhang commented on HUDI-5770:
----------------------------------

The cause of this bug is similar with [HUDI-4601|https://issues.apache.org/jira/browse/HUDI-4601]

If partition column is timestamp type, the partition path value is not the real value, because the partition value is converted according to the real value.

We need take care the partition case when it's timestamp/date type when applying partition prune.

 

> Plan error when partition column is timestamp type and SQL query contains filter condition which contains partition
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: HUDI-5770
>                 URL: https://issues.apache.org/jira/browse/HUDI-5770
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink-sql
>            Reporter: Jing Zhang
>            Priority: Major
>
> If a hudi table is a partition table, and partition column is timestamp type.
> When run a flink query which contain the filter conditions on partition column, an error would be thrown out in the plan generating phase.
> {code:java}
> java.time.format.DateTimeParseException: Text '1970010100' could not be parsed at index 0    at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>     at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>     at org.apache.flink.table.utils.DateTimeUtils.parseTimestampData(DateTimeUtils.java:413)
>     at org.apache.flink.table.planner.plan.utils.PartitionPruner$.convertPartitionFieldValue(PartitionPruner.scala:182)
>     at org.apache.flink.table.planner.plan.utils.PartitionPruner$.$anonfun$convertPartitionToRow$1(PartitionPruner.scala:157)
>     at org.apache.flink.table.planner.plan.utils.PartitionPruner$.$anonfun$convertPartitionToRow$1$adapted(PartitionPruner.scala:155)
>     at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
>     at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194)
>     at org.apache.flink.table.planner.plan.utils.PartitionPruner$.convertPartitionToRow(PartitionPruner.scala:155)
>     at org.apache.flink.table.planner.plan.utils.PartitionPruner$.$anonfun$prunePartitions$1(PartitionPruner.scala:137)
>     at org.apache.flink.table.planner.plan.utils.PartitionPruner$.$anonfun$prunePartitions$1$adapted(PartitionPruner.scala:132)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at org.apache.flink.table.planner.plan.utils.PartitionPruner$.prunePartitions(PartitionPruner.scala:132)
>     at org.apache.flink.table.planner.plan.utils.PartitionPruner.prunePartitions(PartitionPruner.scala)
>     at org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoTableSourceScanRule.lambda$onMatch$3(PushPartitionIntoTableSourceScanRule.java:163)
>     at org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoTableSourceScanRule.readPartitionsAndPrune(PushPartitionIntoTableSourceScanRule.java:254)
>     at org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoTableSourceScanRule.onMatch(PushPartitionIntoTableSourceScanRule.java:172)
>     at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>     at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>     at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>     at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>     at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>     at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>     at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
>     at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
>     at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at scala.collection.immutable.Range.foreach(Range.scala:155)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
>     at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
>     at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at scala.collection.immutable.Range.foreach(Range.scala:155)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>     at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>     at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
>     at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
>     at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
>     at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
>     at scala.collection.immutable.List.foreach(List.scala:388)
>     at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
>     at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
>     at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
>     at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1733)
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:874)
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382)
>     at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:475)
>     at org.apache.hudi.table.ITTestHoodieDataSource.lambda$testMergeOnReadCompactionWithTimestampPartitioning$31(ITTestHoodieDataSource.java:1229)
>     at org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:101)
>     at org.apache.hudi.table.ITTestHoodieDataSource.testMergeOnReadCompactionWithTimestampPartitioning(ITTestHoodieDataSource.java:1228)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
>     at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>     at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>     at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>     at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>     at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>     at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>     at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>     at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>     at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>     at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>     at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>     at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>     at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
>     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
>     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
>     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
>     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
>     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
>     at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
>     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
>     at java.util.ArrayList.forEach(ArrayList.java:1257)
>     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
>     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
>     at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
>     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
>     at java.util.ArrayList.forEach(ArrayList.java:1257)
>     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
>     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
>     at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
>     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
>     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
>     at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
>     at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
>     at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
>     at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
>     at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
>     at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
>     at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
>     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
>     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
>     at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
>     at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>     at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>     at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) {code}
> We could reproduce this bug adding the following tests in `org.apache.hudi.table.ITTestHoodieDataSource` :
> {code:java}
> @Test
> void testStaticPartitionPruneWithTimestampPartitioning() {
>   TableEnvironment tableEnv = batchTableEnv;
>   String hoodieTableDDL = sql("t1")
>       .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
>       .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
>       .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
>       .option(FlinkOptions.COMPACTION_TASKS, 1)
>       .partitionField("ts")
>       .end();
>   tableEnv.executeSql(hoodieTableDDL);
>   execInsertSql(tableEnv, TestSQL.INSERT_T1);
>   List<Row> rows = CollectionUtil.iterableToList(
>       () -> tableEnv.sqlQuery(
>           "select * from t1 where ts = TIMESTAMP '1970-01-01 00:00:01'")
>           .execute().collect());
>   assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)