You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jingsong Lee (Jira)" <ji...@apache.org> on 2022/06/14 09:21:00 UTC

[jira] [Commented] (FLINK-28041) table store cannot distinguish filesystem Scheme when system have 'hadoop classpath'

    [ https://issues.apache.org/jira/browse/FLINK-28041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17554007#comment-17554007 ] 

Jingsong Lee commented on FLINK-28041:
--------------------------------------

Thanks [~chaojipaopao] for your reporting.

I think we should enrich the path to a URI for making sure a consistent default scheme.

> table store cannot  distinguish  filesystem Scheme when system have 'hadoop classpath'
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-28041
>                 URL: https://issues.apache.org/jira/browse/FLINK-28041
>             Project: Flink
>          Issue Type: Bug
>          Components: Table Store
>    Affects Versions: table-store-0.1.0, table-store-0.2.0, table-store-0.1.1
>         Environment: flink 1.15.0
> flink-table-store 0.1.0
> hadoop 2.6.5
>            Reporter: yutao
>            Priority: Major
>              Labels: easyfix
>
> when using flink-table-store Quick Start 
> {code:java}
> //step5: 
> SET 'table-store.path' = '/tmp/table_store'; {code}
> then write data submit the  insert sql  to the cluster
> {code:java}
> //代码占位符
> INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word; {code}
> wecan see the job failed ;
> the log like this:
> java.io.IOException: Could not perform checkpoint 1 for operator Writer -> Local Committer (1/1)#0.
> at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1210)
> at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
> at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
> at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
> at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
> at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
> at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
> at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
> at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
> at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
> at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: java.lang.RuntimeException: java.io.FileNotFoundException: File does not exist: /tmp/table_store/default_catalog.catalog/default_database.db/ word_count/bucket-0/sst-795c7ecf-40d9-433a-8a49-81336940be7a-0
> at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
> at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2157)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2127)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2040)
> at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:583)
> at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:94)
> at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:377)
> at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2278)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2274)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2272)
> at org.apache.flink.table.store.connector.sink.StoreSinkWriter.prepareCommit(StoreSinkWriter.java:172)
> at org.apache.flink.table.store.connector.sink.StoreSinkWriter.prepareCommit(StoreSinkWriter.java:51)
> at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:196)
> at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:166)
> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
> at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:300)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1198)
> ... 22 more
>  
> *the program can distinguish filesystem*
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)