You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Alexey Kudinkin (Jira)" <ji...@apache.org> on 2022/10/21 23:54:00 UTC

[jira] [Comment Edited] (SPARK-40876) Spark's Vectorized ParquetReader should support type promotions

    [ https://issues.apache.org/jira/browse/SPARK-40876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17622518#comment-17622518 ] 

Alexey Kudinkin edited comment on SPARK-40876 at 10/21/22 11:53 PM:
--------------------------------------------------------------------

> This is because Parquet Int32, which is a physical type in Parquet, can represent byte/short etc, using additional logical types. See [here|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#numeric-types] for more info. Based on the logical types, Spark converts Parquet type to different Spark types, such as {{{}ByteType{}}}, {{{}ShortType{}}}, etc.

Right, i totally forgot that Parquet doesn't have integers smaller than Int32.


was (Author: alexey.kudinkin):
> This is because Parquet Int32, which is a physical type in Parquet, can represent byte/short etc, using additional logical types. See [here|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#numeric-types] for more info. Based on the logical types, Spark converts Parquet type to different Spark types, such as {{{}ByteType{}}}, {{{}ShortType{}}}, etc.

Right, i totally forgot that Parquet doesn't have smaller than Int32.

> Spark's Vectorized ParquetReader should support type promotions
> ---------------------------------------------------------------
>
>                 Key: SPARK-40876
>                 URL: https://issues.apache.org/jira/browse/SPARK-40876
>             Project: Spark
>          Issue Type: Improvement
>          Components: Input/Output
>    Affects Versions: 3.3.0
>            Reporter: Alexey Kudinkin
>            Priority: Major
>
> Currently, when reading Parquet table using Spark's `VectorizedColumnReader`, we hit an issue where we specify requested (projection) schema where one of the field's type is widened from int32 to long.
> Expectation is that since this is totally legitimate primitive type promotion, we should be able to read Ints into Longs w/ no problems (for ex, Avro is able to do that perfectly fine).
> However, we're facing an issue where `ParquetVectorUpdaterFactory.getUpdater` method fails w/ the exception listed below.
> Looking at the code, It actually seems to be allowing the opposite – it allows to "down-size" Int32s persisted in the Parquet to be read as Bytes or Shorts for ex. I'm actually not sure what's the rationale for this behavior, and this actually seems like a bug to me (as this will essentially be leading to data truncation):
> {code:java}
> case INT32:
>   if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, sparkType)) {
>     return new IntegerUpdater();
>   } else if (sparkType == DataTypes.LongType && isUnsignedIntTypeMatched(32)) {
>     // In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our LongType.
>     // For unsigned int32, it stores as plain signed int32 in Parquet when dictionary
>     // fallbacks. We read them as long values.
>     return new UnsignedIntegerUpdater();
>   } else if (sparkType == DataTypes.ByteType) {
>     return new ByteUpdater();
>   } else if (sparkType == DataTypes.ShortType) {
>     return new ShortUpdater();
>   } else if (sparkType == DataTypes.DateType) {
>     if ("CORRECTED".equals(datetimeRebaseMode)) {
>       return new IntegerUpdater();
>     } else {
>       boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
>       return new IntegerWithRebaseUpdater(failIfRebase);
>     }
>   }
>   break; {code}
> Exception:
> {code:java}
> at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
>     at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
>     at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
>     at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>     at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
>     at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
>     at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
>     at scala.Option.foreach(Option.scala:407)
>     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
>     at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
>     at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
>     at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:304)
>     at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:171)
>     at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:293)
>     at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:173)
>     at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:167)
>     at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:143)
>     at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139)
>     at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
>     at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
>     at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68)
>     at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67)
>     at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:115)
>     at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:170)
>     at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:170)
>     at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:172)
>     at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82)
>     at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:256)
>     at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:254)
>     at scala.collection.Iterator.foreach(Iterator.scala:941)
>     at scala.collection.Iterator.foreach$(Iterator.scala:941)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:254)
>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>     at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:226)
>     at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:365)
>     at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:338)
>     at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
>     at org.apache.spark.sql.Dataset.$anonfun$collectAsList$1(Dataset.scala:2983)
>     at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
>     at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>     at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>     at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>     at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>     at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
>     at org.apache.spark.sql.Dataset.collectAsList(Dataset.scala:2982)
>     at org.apache.hudi.functional.TestBasicSchemaEvolution.loadTable$1(TestBasicSchemaEvolution.scala:142)
>     at org.apache.hudi.functional.TestBasicSchemaEvolution.testBasicSchemaEvolution(TestBasicSchemaEvolution.scala:325)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
>     at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>     at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>     at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>     at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>     at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92)
>     at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>     at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>     at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>     at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>     at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>     at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>     at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>     at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
>     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
>     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
>     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
>     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
>     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
>     at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
>     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
>     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:212)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:192)
>     at org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:139)
>     at org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.lambda$execute$2(TestTemplateTestDescriptor.java:107)
>     at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>     at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>     at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
>     at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>     at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>     at java.util.stream.ReferencePipeline$11$1.accept(ReferencePipeline.java:440)
>     at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>     at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>     at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>     at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>     at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>     at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>     at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>     at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>     at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>     at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>     at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
>     at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
>     at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>     at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>     at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>     at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>     at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>     at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>     at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>     at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
>     at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
>     at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>     at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>     at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>     at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>     at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
>     at org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:107)
>     at org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:42)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
>     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
>     at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
>     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
>     at java.util.ArrayList.forEach(ArrayList.java:1259)
>     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
>     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
>     at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
>     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
>     at java.util.ArrayList.forEach(ArrayList.java:1259)
>     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
>     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
>     at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
>     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
>     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
>     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
>     at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
>     at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
>     at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
>     at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
>     at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
>     at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
>     at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
>     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
>     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
>     at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
>     at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
>     at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
>     at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
>     at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
>     at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
> Caused by: org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file file:///var/folders/kb/cnff55vj041g2nnlzs5ylqk00000gn/T/junit929824667247538999/dataset/1/58fa0868-2c09-4e51-8a68-d4a7cf5aca4c-0_0-0-0_20221021152854772.parquet. Column: [timestamp], Expected: bigint, Found: INT32
>     at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedSchemaColumnConvertError(QueryExecutionErrors.scala:570)
>     at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:195)
>     at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
>     at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:522)
>     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
>     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
>     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
>     at org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:41)
>     at org.apache.spark.RangePartitioner$.$anonfun$sketch$1(Partitioner.scala:306)
>     at org.apache.spark.RangePartitioner$.$anonfun$sketch$1$adapted(Partitioner.scala:304)
>     at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
>     at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>     at org.apache.spark.scheduler.Task.run(Task.scala:131)
>     at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
>     at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.constructConvertNotSupportedException(ParquetVectorUpdaterFactory.java:1104)
>     at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.getUpdater(ParquetVectorUpdaterFactory.java:181)
>     at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:161)
>     at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:298)
>     at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:196)
>     at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>     at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
>     at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:191)
>     ... 24 more 
> {code}
>  
> Toy schema of the Parquet file we're trying to read:
> {code:java}
> message spark_schema {
>   required binary first_name (STRING);
>   optional binary last_name (STRING);
>   optional int32 timestamp;
>   optional int32 partition;
> }
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org