You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by vipul singh <ne...@gmail.com> on 2017/10/03 06:01:33 UTC

Weird error in submitting a flink job to yarn cluster

Hello,

I am working on a ParquetSink writer, which will convert a kafka stream to
parquet format. I am having some weird issues in deploying this application
to a yarn cluster. I am not 100% sure this falls into a flink related
error, but I wanted to reach out to folks here incase it might be.


If I launch Flink within YARN only for executing a single job, it runs ok.
This is the command I use for the deployment:

*Command:* *flink run  --jobmanager yarn-cluster -ytm 4096 -yjm 1048 -ys 2
-yn 2 -d -c <class_name> jar_name.jar*

However as soon as I try to submit a similar job to a already running yarn
cluster, I start to get these
errors(*https://gist.github.com/neoeahit/f0130e9f447ea9c2baa38bf5ee4e6a57
<https://gist.github.com/neoeahit/f0130e9f447ea9c2baa38bf5ee4e6a57>*) and
application crashes. I checked the location in /tmp, where I am creating
the file, and there is no file existing there.

*Command:* *flink run -yid application_id -d -c <class_name> jar_name.jar *


A bit more about my algorithm, I use a temp array to buffer messages in the
@invoke method, and when specific threshold are reached I create a parquet
file with this buffered data. Once a tmp parquet file is created, I upload
this file to long term storage.

The code to write buffered data to a parquet file is:

 writer = Some(AvroParquetWriter.builder(getPendingFilePath(tmp_filename.get))
   .withSchema(schema.get)
   .withCompressionCodec(compressionCodecName)
   .withRowGroupSize(blockSize)
   .withPageSize(pageSize)
   .build())
bufferedMessages.foreach { e =>
  writer.get.write(e.payload)
}
writer.get.close()


Please do let me know.

Thanking in advance,
- Vipul

Re: Weird error in submitting a flink job to yarn cluster

Posted by Chesnay Schepler <ch...@apache.org>.
This isn't related to FLink but i might be able to help you out anyway.

Does the ParquestFileWriter set the 'overwrite' flag when calling 
'FileSystem#create()'?

My suspicion is that you create a file for the first batch, write it 
out, but not delete it.
For the next batch, the file cannot be created (since it still exists) 
and thus fails.

Since the application now crashes the /tmp directory probably gets 
cleaned up, which is why you don't see
any leftover file.

To verify this theory you can add a simple counter to your sink for the 
number of created files. It should succeed
for the first batch and fail on the second one. In this case you should 
make sure that the file is deleted after the first
batch has been written.

On 03.10.2017 08:01, vipul singh wrote:
> Hello,
>
> I am working on a ParquetSink writer, which will convert a kafka 
> stream to parquet format. I am having some weird issues in deploying 
> this application to a yarn cluster. I am not 100% sure this falls into 
> a flink related error, but I wanted to reach out to folks here incase 
> it might be.
>
>
> If I launch Flink within YARN only for executing a single job, it runs 
> ok. This is the command I use for the deployment:
>
> *Command:* /flink run--jobmanager yarn-cluster -ytm 4096 -yjm 1048 -ys 
> 2 -yn 2 -d -c <class_name> jar_name.jar///
>
> However as soon as I try to submit a similar job to a already running 
> yarn cluster, I start to get these 
> errors(_https://gist.github.com/neoeahit/f0130e9f447ea9c2baa38bf5ee4e6a57_) 
> and application crashes. I checked the location in /tmp, where I am 
> creating the file, and there is no file existing there.
>
> *Command:* /flink run -yid application_id -d -c 
> <class_name> jar_name.jar /
>
>
> A bit more about my algorithm, I use a temp array to buffer messages 
> in the @invoke method, and when specific threshold are reached I 
> create a parquet file with this buffered data. Once a tmp parquet file 
> is created, I upload this file to long term storage.
>
> The code to write buffered data to a parquet file is:
>
>   writer =Some(AvroParquetWriter.builder(getPendingFilePath(tmp_filename.get))
>     .withSchema(schema.get)
>     .withCompressionCodec(compressionCodecName)
>     .withRowGroupSize(blockSize)
>     .withPageSize(pageSize)
>     .build())
> bufferedMessages.foreach { e =>
>    writer.get.write(e.payload)
> }
> writer.get.close()
>
> Please do let me know.
>
> Thanking in advance,
> - Vipul
>
>