You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Paul Lam <pa...@gmail.com> on 2020/07/21 08:02:17 UTC

FileNotFoundException when writting Hive orc tables

Hi,

I'm doing a POC on Hive connectors and find that when writing orc format Hive tables, the job failed with FileNotFoundException right after ingesting data (full stacktrace at the bottom of the mail).

The error can be steadily reproduced in my environment, which is Hadoop 2.6.5(CDH-5.6.0), Hive 1.1.0(CDH-5.6.0) and Flink 1.11.0. It only happens in orc tables, while other bulk formats are fine.

Does anyone have an idea about this error? Any comment and suggestions are appreciated. Thanks!

Stacktrace:

Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xxx/warehouse2/tmp_table/.part-6b51dbc2-e169-43a8-93b2-eb8d2be45054-0-0.inprogress.d77fa76c-4760-4cb6-bb5b-97d70afff000
	at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1218)
	at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210)
	at org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
	at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
	at org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
	at org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
	at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
	at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
	at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
	at org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at StreamExecCalc$2.processElement(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at SourceConversion$1.processElement(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)


Best,
Paul Lam


Re: FileNotFoundException when writting Hive orc tables

Posted by Paul Lam <pa...@gmail.com>.
Thanks for your help anyway, Jingsong & Rui.

I read the jira description, and I’m +1 to check the lazy initiation first. It looks like the file creation is skipped or it doesn’t block the writing, and I’ve seen a bucket was writing to a file that was not supposed to exist, e.g. its parent dir was not created yet. 

Best,
Paul Lam

> 2020年7月21日 18:50,Jingsong Li <ji...@gmail.com> 写道:
> 
> Hi,
> 
> Sorry for this. This work around only works in Hive 2+.
> We can only wait for 1.11.2.
> 
> Best,
> Jingsong
> 
> On Tue, Jul 21, 2020 at 6:15 PM Rui Li <lirui.fudan@gmail.com <ma...@gmail.com>> wrote:
> Hi Paul,
> 
> I believe Jingsong meant try using native writer, for which the option key is `table.exec.hive.fallback-mapred-writer` and is by default set to true.
> You can set it to false like this: tableEnv.getConfig().getConfiguration().set( HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false)
> 
> On Tue, Jul 21, 2020 at 6:07 PM Paul Lam <paullin3280@gmail.com <ma...@gmail.com>> wrote:
> Hi JingSong,
> 
> Thanks for your advice! But IIUC, it seems that `table.exec.hive.fallback-mapred-reader` is false by default? 
> 
> Moreover, explicitly setting this option might cause a serialization issue. Wonder if I’m setting it in the right way? 
> 
> ```
> tableEnv.getConfig().getConfiguration().setString("table.exec.hive.fallback-mapred-writer", "false”);
> ```
> 
> The error it caused:
> 
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:715)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> 	at com.my.package.class(JobEntry.java:65)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> 	... 11 more
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot serialize operator object class org.apache.flink.streaming.api.operators.SimpleOperatorFactory.
> 	at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:263)
> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:495)
> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:314)
> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:260)
> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:169)
> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:109)
> 	at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850)
> 	at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> 	at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> 	at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55)
> 	at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:62)
> 	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
> 	at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
> 	at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
> 	... 19 more
> Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> 	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
> 	at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:515)
> 	at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:260)
> 	... 36 more
> 
> Best,
> Paul Lam
> 
>> 2020年7月21日 16:59,Jingsong Li <jingsonglee0@gmail.com <ma...@gmail.com>> 写道:
>> 
>> Hi Paul,
>> 
>> If your orc table has no complex(list,map,row) types, you can try to set `table.exec.hive.fallback-mapred-writer` to false in TableConfig. And Hive sink will use ORC native writer, it is a work-around way.
>> 
>> About this error, I think this is a bug for Hive 1.1 ORC. I will try to re-produce it.
>> 
>> I created https://issues.apache.org/jira/browse/FLINK-18659 <https://issues.apache.org/jira/browse/FLINK-18659> to track this. If it is a bug, it should be fixed in 1.11.2
>> 
>> Best,
>> Jingsong
>> 
>> On Tue, Jul 21, 2020 at 4:25 PM Rui Li <lirui.fudan@gmail.com <ma...@gmail.com>> wrote:
>> Hey Paul,
>> 
>> Could you please share more about your job, e.g. the schema of your Hive table, whether it's partitioned, and the table properties you've set?
>> 
>> On Tue, Jul 21, 2020 at 4:02 PM Paul Lam <paullin3280@gmail.com <ma...@gmail.com>> wrote:
>> Hi,
>> 
>> I'm doing a POC on Hive connectors and find that when writing orc format Hive tables, the job failed with FileNotFoundException right after ingesting data (full stacktrace at the bottom of the mail).
>> 
>> The error can be steadily reproduced in my environment, which is Hadoop 2.6.5(CDH-5.6.0), Hive 1.1.0(CDH-5.6.0) and Flink 1.11.0. It only happens in orc tables, while other bulk formats are fine.
>> 
>> Does anyone have an idea about this error? Any comment and suggestions are appreciated. Thanks!
>> 
>> Stacktrace:
>> 
>> Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xxx/warehouse2/tmp_table/.part-6b51dbc2-e169-43a8-93b2-eb8d2be45054-0-0.inprogress.d77fa76c-4760-4cb6-bb5b-97d70afff000 <>
>> 	at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1218)
>> 	at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210)
>> 	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>> 	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210)
>> 	at org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
>> 	at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
>> 	at org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
>> 	at org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
>> 	at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
>> 	at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
>> 	at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
>> 	at org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> 	at StreamExecCalc$2.processElement(Unknown Source)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> 	at SourceConversion$1.processElement(Unknown Source)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151)
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>> 
>> 
>> Best,
>> Paul Lam
>> 
>> 
>> 
>> -- 
>> Best regards!
>> Rui Li
>> 
>> 
>> -- 
>> Best, Jingsong Lee
> 
> 
> 
> -- 
> Best regards!
> Rui Li
> 
> 
> -- 
> Best, Jingsong Lee


Re: FileNotFoundException when writting Hive orc tables

Posted by Jingsong Li <ji...@gmail.com>.
Hi,

Sorry for this. This work around only works in Hive 2+.
We can only wait for 1.11.2.

Best,
Jingsong

On Tue, Jul 21, 2020 at 6:15 PM Rui Li <li...@gmail.com> wrote:

> Hi Paul,
>
> I believe Jingsong meant try using native writer, for which the option key
> is `table.exec.hive.fallback-mapred-writer` and is by default set to true.
> You can set it to false like
> this: tableEnv.getConfig().getConfiguration().set(
> HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false)
>
> On Tue, Jul 21, 2020 at 6:07 PM Paul Lam <pa...@gmail.com> wrote:
>
>> Hi JingSong,
>>
>> Thanks for your advice! But IIUC, it seems
>> that `table.exec.hive.fallback-mapred-reader` is false by default?
>>
>> Moreover, explicitly setting this option might cause a serialization
>> issue. Wonder if I’m setting it in the right way?
>>
>> ```
>>
>> tableEnv.getConfig().getConfiguration().setString("table.exec.hive.fallback-mapred-writer", "false”);
>>
>> ```
>>
>> The error it caused:
>>
>> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
>> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:715)
>> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>> 	at com.my.package.class(JobEntry.java:65)
>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:498)
>> 	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>> 	... 11 more
>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot serialize operator object class org.apache.flink.streaming.api.operators.SimpleOperatorFactory.
>> 	at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:263)
>> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:495)
>> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:314)
>> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
>> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
>> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
>> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:260)
>> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:169)
>> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:109)
>> 	at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850)
>> 	at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
>> 	at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
>> 	at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55)
>> 	at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:62)
>> 	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
>> 	at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
>> 	at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
>> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
>> 	... 19 more
>> Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
>> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> 	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> 	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
>> 	at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:515)
>> 	at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:260)
>> 	... 36 more
>>
>>
>> Best,
>> Paul Lam
>>
>> 2020年7月21日 16:59,Jingsong Li <ji...@gmail.com> 写道:
>>
>> Hi Paul,
>>
>> If your orc table has no complex(list,map,row) types, you can try to set
>> `table.exec.hive.fallback-mapred-writer` to false in TableConfig. And Hive
>> sink will use ORC native writer, it is a work-around way.
>>
>> About this error, I think this is a bug for Hive 1.1 ORC. I will try to
>> re-produce it.
>>
>> I created https://issues.apache.org/jira/browse/FLINK-18659 to track
>> this. If it is a bug, it should be fixed in 1.11.2
>>
>> Best,
>> Jingsong
>>
>> On Tue, Jul 21, 2020 at 4:25 PM Rui Li <li...@gmail.com> wrote:
>>
>>> Hey Paul,
>>>
>>> Could you please share more about your job, e.g. the schema of your Hive
>>> table, whether it's partitioned, and the table properties you've set?
>>>
>>> On Tue, Jul 21, 2020 at 4:02 PM Paul Lam <pa...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm doing a POC on Hive connectors and find that when writing orc
>>>> format Hive tables, the job failed with FileNotFoundException right after
>>>> ingesting data (full stacktrace at the bottom of the mail).
>>>>
>>>> The error can be steadily reproduced in my environment, which is Hadoop
>>>> 2.6.5(CDH-5.6.0), Hive 1.1.0(CDH-5.6.0) and Flink 1.11.0. It only happens
>>>> in orc tables, while other bulk formats are fine.
>>>>
>>>> Does anyone have an idea about this error? Any comment and suggestions
>>>> are appreciated. Thanks!
>>>>
>>>> Stacktrace:
>>>>
>>>> Caused by: java.io.FileNotFoundException: File does not exist:
>>>> hdfs://xxx/warehouse2/tmp_table/.part-6b51dbc2-e169-43a8-93b2-eb8d2be45054-0-0.inprogress.d77fa76c-4760-4cb6-bb5b-97d70afff000
>>>> at
>>>> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1218)
>>>> at
>>>> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210)
>>>> at
>>>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>> at
>>>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210)
>>>> at
>>>> org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
>>>> at
>>>> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
>>>> at
>>>> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
>>>> at
>>>> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
>>>> at
>>>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
>>>> at
>>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
>>>> at
>>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
>>>> at
>>>> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>>>> at
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>>>> at
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>>>> at StreamExecCalc$2.processElement(Unknown Source)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>>>> at
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>>>> at
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>>>> at SourceConversion$1.processElement(Unknown Source)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>>>> at
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>>>> at
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>>>
>>>>
>>>> Best,
>>>> Paul Lam
>>>>
>>>>
>>>
>>> --
>>> Best regards!
>>> Rui Li
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>>
>>
>
> --
> Best regards!
> Rui Li
>


-- 
Best, Jingsong Lee

Re: FileNotFoundException when writting Hive orc tables

Posted by Rui Li <li...@gmail.com>.
Hi Paul,

I believe Jingsong meant try using native writer, for which the option key
is `table.exec.hive.fallback-mapred-writer` and is by default set to true.
You can set it to false like
this: tableEnv.getConfig().getConfiguration().set(
HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false)

On Tue, Jul 21, 2020 at 6:07 PM Paul Lam <pa...@gmail.com> wrote:

> Hi JingSong,
>
> Thanks for your advice! But IIUC, it seems
> that `table.exec.hive.fallback-mapred-reader` is false by default?
>
> Moreover, explicitly setting this option might cause a serialization
> issue. Wonder if I’m setting it in the right way?
>
> ```
>
> tableEnv.getConfig().getConfiguration().setString("table.exec.hive.fallback-mapred-writer", "false”);
>
> ```
>
> The error it caused:
>
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:715)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> 	at com.my.package.class(JobEntry.java:65)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> 	... 11 more
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot serialize operator object class org.apache.flink.streaming.api.operators.SimpleOperatorFactory.
> 	at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:263)
> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:495)
> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:314)
> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:260)
> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:169)
> 	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:109)
> 	at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850)
> 	at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> 	at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> 	at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55)
> 	at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:62)
> 	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
> 	at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
> 	at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
> 	... 19 more
> Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> 	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
> 	at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:515)
> 	at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:260)
> 	... 36 more
>
>
> Best,
> Paul Lam
>
> 2020年7月21日 16:59,Jingsong Li <ji...@gmail.com> 写道:
>
> Hi Paul,
>
> If your orc table has no complex(list,map,row) types, you can try to set
> `table.exec.hive.fallback-mapred-writer` to false in TableConfig. And Hive
> sink will use ORC native writer, it is a work-around way.
>
> About this error, I think this is a bug for Hive 1.1 ORC. I will try to
> re-produce it.
>
> I created https://issues.apache.org/jira/browse/FLINK-18659 to track
> this. If it is a bug, it should be fixed in 1.11.2
>
> Best,
> Jingsong
>
> On Tue, Jul 21, 2020 at 4:25 PM Rui Li <li...@gmail.com> wrote:
>
>> Hey Paul,
>>
>> Could you please share more about your job, e.g. the schema of your Hive
>> table, whether it's partitioned, and the table properties you've set?
>>
>> On Tue, Jul 21, 2020 at 4:02 PM Paul Lam <pa...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm doing a POC on Hive connectors and find that when writing orc format
>>> Hive tables, the job failed with FileNotFoundException right after
>>> ingesting data (full stacktrace at the bottom of the mail).
>>>
>>> The error can be steadily reproduced in my environment, which is Hadoop
>>> 2.6.5(CDH-5.6.0), Hive 1.1.0(CDH-5.6.0) and Flink 1.11.0. It only happens
>>> in orc tables, while other bulk formats are fine.
>>>
>>> Does anyone have an idea about this error? Any comment and suggestions
>>> are appreciated. Thanks!
>>>
>>> Stacktrace:
>>>
>>> Caused by: java.io.FileNotFoundException: File does not exist:
>>> hdfs://xxx/warehouse2/tmp_table/.part-6b51dbc2-e169-43a8-93b2-eb8d2be45054-0-0.inprogress.d77fa76c-4760-4cb6-bb5b-97d70afff000
>>> at
>>> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1218)
>>> at
>>> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210)
>>> at
>>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>> at
>>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210)
>>> at
>>> org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
>>> at
>>> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
>>> at
>>> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
>>> at
>>> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
>>> at
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
>>> at
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
>>> at
>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
>>> at
>>> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>>> at
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>>> at
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>>> at StreamExecCalc$2.processElement(Unknown Source)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>>> at
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>>> at
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>>> at SourceConversion$1.processElement(Unknown Source)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>>> at
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>>> at
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>>
>>>
>>> Best,
>>> Paul Lam
>>>
>>>
>>
>> --
>> Best regards!
>> Rui Li
>>
>
>
> --
> Best, Jingsong Lee
>
>
>

-- 
Best regards!
Rui Li

Re: FileNotFoundException when writting Hive orc tables

Posted by Paul Lam <pa...@gmail.com>.
Hi JingSong,

Thanks for your advice! But IIUC, it seems that `table.exec.hive.fallback-mapred-reader` is false by default? 

Moreover, explicitly setting this option might cause a serialization issue. Wonder if I’m setting it in the right way? 

```
tableEnv.getConfig().getConfiguration().setString("table.exec.hive.fallback-mapred-writer", "false”);
```

The error it caused:

Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:715)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
	at com.my.package.class(JobEntry.java:65)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
	... 11 more
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot serialize operator object class org.apache.flink.streaming.api.operators.SimpleOperatorFactory.
	at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:263)
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:495)
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:314)
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:260)
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:169)
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:109)
	at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850)
	at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
	at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
	at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55)
	at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:62)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
	at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
	at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
	... 19 more
Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
	at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:515)
	at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:260)
	... 36 more

Best,
Paul Lam

> 2020年7月21日 16:59,Jingsong Li <ji...@gmail.com> 写道:
> 
> Hi Paul,
> 
> If your orc table has no complex(list,map,row) types, you can try to set `table.exec.hive.fallback-mapred-writer` to false in TableConfig. And Hive sink will use ORC native writer, it is a work-around way.
> 
> About this error, I think this is a bug for Hive 1.1 ORC. I will try to re-produce it.
> 
> I created https://issues.apache.org/jira/browse/FLINK-18659 <https://issues.apache.org/jira/browse/FLINK-18659> to track this. If it is a bug, it should be fixed in 1.11.2
> 
> Best,
> Jingsong
> 
> On Tue, Jul 21, 2020 at 4:25 PM Rui Li <lirui.fudan@gmail.com <ma...@gmail.com>> wrote:
> Hey Paul,
> 
> Could you please share more about your job, e.g. the schema of your Hive table, whether it's partitioned, and the table properties you've set?
> 
> On Tue, Jul 21, 2020 at 4:02 PM Paul Lam <paullin3280@gmail.com <ma...@gmail.com>> wrote:
> Hi,
> 
> I'm doing a POC on Hive connectors and find that when writing orc format Hive tables, the job failed with FileNotFoundException right after ingesting data (full stacktrace at the bottom of the mail).
> 
> The error can be steadily reproduced in my environment, which is Hadoop 2.6.5(CDH-5.6.0), Hive 1.1.0(CDH-5.6.0) and Flink 1.11.0. It only happens in orc tables, while other bulk formats are fine.
> 
> Does anyone have an idea about this error? Any comment and suggestions are appreciated. Thanks!
> 
> Stacktrace:
> 
> Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xxx/warehouse2/tmp_table/.part-6b51dbc2-e169-43a8-93b2-eb8d2be45054-0-0.inprogress.d77fa76c-4760-4cb6-bb5b-97d70afff000 <>
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1218)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210)
> 	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210)
> 	at org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
> 	at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
> 	at org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
> 	at org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
> 	at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
> 	at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
> 	at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
> 	at org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> 	at StreamExecCalc$2.processElement(Unknown Source)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> 	at SourceConversion$1.processElement(Unknown Source)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> 
> 
> Best,
> Paul Lam
> 
> 
> 
> -- 
> Best regards!
> Rui Li
> 
> 
> -- 
> Best, Jingsong Lee


Re: FileNotFoundException when writting Hive orc tables

Posted by Jingsong Li <ji...@gmail.com>.
Hi Paul,

If your orc table has no complex(list,map,row) types, you can try to set
`table.exec.hive.fallback-mapred-writer` to false in TableConfig. And Hive
sink will use ORC native writer, it is a work-around way.

About this error, I think this is a bug for Hive 1.1 ORC. I will try to
re-produce it.

I created https://issues.apache.org/jira/browse/FLINK-18659 to track this.
If it is a bug, it should be fixed in 1.11.2

Best,
Jingsong

On Tue, Jul 21, 2020 at 4:25 PM Rui Li <li...@gmail.com> wrote:

> Hey Paul,
>
> Could you please share more about your job, e.g. the schema of your Hive
> table, whether it's partitioned, and the table properties you've set?
>
> On Tue, Jul 21, 2020 at 4:02 PM Paul Lam <pa...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm doing a POC on Hive connectors and find that when writing orc format
>> Hive tables, the job failed with FileNotFoundException right after
>> ingesting data (full stacktrace at the bottom of the mail).
>>
>> The error can be steadily reproduced in my environment, which is Hadoop
>> 2.6.5(CDH-5.6.0), Hive 1.1.0(CDH-5.6.0) and Flink 1.11.0. It only happens
>> in orc tables, while other bulk formats are fine.
>>
>> Does anyone have an idea about this error? Any comment and suggestions
>> are appreciated. Thanks!
>>
>> Stacktrace:
>>
>> Caused by: java.io.FileNotFoundException: File does not exist:
>> hdfs://xxx/warehouse2/tmp_table/.part-6b51dbc2-e169-43a8-93b2-eb8d2be45054-0-0.inprogress.d77fa76c-4760-4cb6-bb5b-97d70afff000
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1218)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210)
>> at
>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210)
>> at
>> org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
>> at
>> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
>> at
>> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
>> at
>> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
>> at
>> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at StreamExecCalc$2.processElement(Unknown Source)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at SourceConversion$1.processElement(Unknown Source)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>
>>
>> Best,
>> Paul Lam
>>
>>
>
> --
> Best regards!
> Rui Li
>


-- 
Best, Jingsong Lee

Re: FileNotFoundException when writting Hive orc tables

Posted by Paul Lam <pa...@gmail.com>.
Hi Rui,

I reproduced the error with a minimum case, the SQL is similar to `insert into hive_table_x select simple_string from kafka_table_b`. 

I’m pretty sure it’s not related to the table schema. And I removed all the optional properties in the Hive table DDL, the error still happened.

Best,
Paul Lam

> 2020年7月21日 16:24,Rui Li <li...@gmail.com> 写道:
> 
> Hey Paul,
> 
> Could you please share more about your job, e.g. the schema of your Hive table, whether it's partitioned, and the table properties you've set?
> 
> On Tue, Jul 21, 2020 at 4:02 PM Paul Lam <paullin3280@gmail.com <ma...@gmail.com>> wrote:
> Hi,
> 
> I'm doing a POC on Hive connectors and find that when writing orc format Hive tables, the job failed with FileNotFoundException right after ingesting data (full stacktrace at the bottom of the mail).
> 
> The error can be steadily reproduced in my environment, which is Hadoop 2.6.5(CDH-5.6.0), Hive 1.1.0(CDH-5.6.0) and Flink 1.11.0. It only happens in orc tables, while other bulk formats are fine.
> 
> Does anyone have an idea about this error? Any comment and suggestions are appreciated. Thanks!
> 
> Stacktrace:
> 
> Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xxx/warehouse2/tmp_table/.part-6b51dbc2-e169-43a8-93b2-eb8d2be45054-0-0.inprogress.d77fa76c-4760-4cb6-bb5b-97d70afff000 <>
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1218)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210)
> 	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210)
> 	at org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
> 	at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
> 	at org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
> 	at org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
> 	at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
> 	at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
> 	at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
> 	at org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> 	at StreamExecCalc$2.processElement(Unknown Source)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> 	at SourceConversion$1.processElement(Unknown Source)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> 
> 
> Best,
> Paul Lam
> 
> 
> 
> -- 
> Best regards!
> Rui Li


Re: FileNotFoundException when writting Hive orc tables

Posted by Rui Li <li...@gmail.com>.
Hey Paul,

Could you please share more about your job, e.g. the schema of your Hive
table, whether it's partitioned, and the table properties you've set?

On Tue, Jul 21, 2020 at 4:02 PM Paul Lam <pa...@gmail.com> wrote:

> Hi,
>
> I'm doing a POC on Hive connectors and find that when writing orc format
> Hive tables, the job failed with FileNotFoundException right after
> ingesting data (full stacktrace at the bottom of the mail).
>
> The error can be steadily reproduced in my environment, which is Hadoop
> 2.6.5(CDH-5.6.0), Hive 1.1.0(CDH-5.6.0) and Flink 1.11.0. It only happens
> in orc tables, while other bulk formats are fine.
>
> Does anyone have an idea about this error? Any comment and suggestions are
> appreciated. Thanks!
>
> Stacktrace:
>
> Caused by: java.io.FileNotFoundException: File does not exist:
> hdfs://xxx/warehouse2/tmp_table/.part-6b51dbc2-e169-43a8-93b2-eb8d2be45054-0-0.inprogress.d77fa76c-4760-4cb6-bb5b-97d70afff000
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1218)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210)
> at
> org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
> at
> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
> at
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
> at
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
> at
> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at StreamExecCalc$2.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at SourceConversion$1.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>
>
> Best,
> Paul Lam
>
>

-- 
Best regards!
Rui Li