You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "xuyangzhong (Jira)" <ji...@apache.org> on 2021/09/15 07:46:00 UTC

[jira] [Created] (FLINK-24291) Decimal precision is lost when deserializing

xuyangzhong created FLINK-24291:
-----------------------------------

             Summary: Decimal precision is lost when deserializing
                 Key: FLINK-24291
                 URL: https://issues.apache.org/jira/browse/FLINK-24291
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
            Reporter: xuyangzhong


When added the test case following into FileSystemItCaseBase:
{code:java}
// create table
tableEnv.executeSql(
  s"""
     |create table test2 (
     |  c0 decimal(10,0), c1 int
     |) with (
     |  'connector' = 'filesystem',
     |  'path' = '/Users/zhongxuyang/test/test',
     |  'format' = 'testcsv'
     |)
   """.stripMargin
)

//test file content is:
//2113554011,1
//2113554022,2
{code}
and
{code:java}
// select sql
@Test
def myTest2(): Unit={
  check(
    "SELECT c0 FROM test2",
    Seq(
      row(2113554011),
      row(2113554022)
    ))
}
{code}
i got an exception :

java.lang.RuntimeException: Failed to fetch next resultjava.lang.RuntimeException: Failed to fetch next result at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370) at java.util.Iterator.forEachRemaining(Iterator.java:115) at org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:109) at org.apache.flink.table.planner.runtime.utils.BatchTestBase.executeQuery(BatchTestBase.scala:300) at org.apache.flink.table.planner.runtime.utils.BatchTestBase.check(BatchTestBase.scala:140) at org.apache.flink.table.planner.runtime.utils.BatchTestBase.checkResult(BatchTestBase.scala:106) at org.apache.flink.table.planner.runtime.batch.sql.BatchFileSystemITCaseBase.check(BatchFileSystemITCaseBase.scala:46) at org.apache.flink.table.planner.runtime.FileSystemITCaseBase$class.myTest2(FileSystemITCaseBase.scala:128) at org.apache.flink.table.planner.runtime.batch.sql.BatchFileSystemITCaseBase.myTest2(BatchFileSystemITCaseBase.scala:33) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)Caused by: java.io.IOException: Failed to fetch job execution result at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ... 43 moreCaused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182) ... 45 moreCaused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:181) ... 45 moreCaused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)Caused by: java.lang.AssertionError at org.apache.flink.table.data.writer.AbstractBinaryWriter.writeDecimal(AbstractBinaryWriter.java:166) at org.apache.flink.table.data.writer.BinaryRowWriter.writeDecimal(BinaryRowWriter.java:27) at org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:144) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:130) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104) at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:103) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513) at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103) at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)Process finished with exit code 255

 This is cause by the unexpected error in _AbstractBinaryWriter.writeDecimal.assert._ When i debug here, i found that the expected value in file 2113554011 will be read to 2113554011.000000000000000000, the precision of which is 38  and cases the following judge is false. (value.precision() is 38 and precision is 10)
{code:java}
assert value == null || (value.precision() == precision)
{code}
 I think this is because when the source reads values from fileSystem, it will treat the decimal  as BigDecimal, and doesn't convert it to the precision we wished. Maybe it's a bug.

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)