You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ivan Budincevic <ib...@bol.com> on 2017/11/07 18:51:35 UTC

AvroParquetWriter may cause task managers to get lost

Hi all,

We recently implemented a feature in our streaming flink job in which we have a AvroParquetWriter which we build every time the overridden “write” method from org.apache.flink.streaming.connectors.fs.Writer gets called. We had to do this because the schema of each record is potentially different and we have to get the schema for the AvroParquetWriter out of the record itself first. Previously this builder was built only one time in the “open” method and from then only the write method was called per record.

Since implementing this our job crashes with “Connection unexpectedly closed by remote task manager ‘internal company url’. This might indicate that the remote task manager was lost.”

We did not run into any issues on our test environments, so we are suspecting this problem occurs only on higher loads as we have on our production environment. Unfortunately we still don’t have a proper means of reproducing this much load on our test environment to debug.

Would having the AvroParquetWriter being built on every write be causing the problem and if so why would that be the case?

Any help in getting to the bottom of the issue would be really appreciated. Bellow there is a code snippet of the class which uses the AvroParquetWriter.

Best regards,
Ivan Budincevic
Software engineer, bol.com
Netherlands

package com.bol.measure.timeblocks.files;

import com.bol.measure.timeblocks.measurement.SlottedMeasurements;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.io.IOException;

public class SlottedMeasurementsWriter implements Writer<SlottedMeasurements> {
  private transient ParquetWriter<GenericRecord> parquetWriter;
  private boolean overwrite;
  private Path path;

  public SlottedMeasurementsWriter(boolean overwrite) {
    this.overwrite = overwrite;
  }

  @Override
  public void open(FileSystem fs, Path path) throws IOException {
    this.path = path;
  }

  @Override
  public long flush() throws IOException {
    return parquetWriter.getDataSize();
  }

  @Override
  public long getPos() throws IOException {
    return parquetWriter.getDataSize();
  }

  @Override
  public void close() throws IOException {
    parquetWriter.close();
  }

  @Override
  public void write(SlottedMeasurements slot) throws IOException {

    final AvroParquetWriter.Builder<GenericRecord> writerBuilder =
      AvroParquetWriter
        .<GenericRecord>builder(path)
        .withSchema(slot.getMeasurements().get(0).getSchema())
        .withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
        .withDictionaryEncoding(true)
        .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0);
    if (overwrite) {
      writerBuilder.withWriteMode(ParquetFileWriter.Mode.OVERWRITE);
    }

    parquetWriter = writerBuilder.build();

    for (GenericRecord measurement : slot.getMeasurements()) {
      parquetWriter.write(measurement);
    }
  }


  @Override
  public Writer<SlottedMeasurements> duplicate() {
    return new SlottedMeasurementsWriter(this.overwrite);
  }
}



Re: AvroParquetWriter may cause task managers to get lost

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Ivan,

I don't have much experience with Avro, but extracting the schema and
creating a writer for each record sounds like a pretty expensive approach.
This might result in significant load and increased GC activity.

Do all records have a different schema or might it make sense to cache the
writers in a weak hashmap?

Best, Fabian


2017-11-07 19:51 GMT+01:00 Ivan Budincevic <ib...@bol.com>:

> Hi all,
>
>
>
> We recently implemented a feature in our streaming flink job in which we
> have a AvroParquetWriter which we build every time the overridden “write”
> method from org.apache.flink.streaming.connectors.fs.Writer gets called.
> We had to do this because the schema of each record is potentially
> different and we have to get the schema for the AvroParquetWriter out of
> the record itself first. Previously this builder was built only one time in
> the “open” method and from then only the write method was called per
> record.
>
>
>
> Since implementing this our job crashes with “Connection unexpectedly
> closed by remote task manager ‘internal company url’. This might indicate
> that the remote task manager was lost.”
>
>
>
> We did not run into any issues on our test environments, so we are
> suspecting this problem occurs only on higher loads as we have on our
> production environment. Unfortunately we still don’t have a proper means of
> reproducing this much load on our test environment to debug.
>
>
>
> Would having the AvroParquetWriter being built on every write be causing
> the problem and if so why would that be the case?
>
>
>
> Any help in getting to the bottom of the issue would be really
> appreciated. Bellow there is a code snippet of the class which uses the
> AvroParquetWriter.
>
>
>
> Best regards,
>
> Ivan Budincevic
>
> Software engineer, bol.com
>
> Netherlands
>
>
>
> package com.bol.measure.timeblocks.files;
>
> import com.bol.measure.timeblocks.measurement.SlottedMeasurements;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.flink.streaming.connectors.fs.Writer;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.apache.parquet.avro.AvroParquetWriter;
> import org.apache.parquet.column.ParquetProperties;
> import org.apache.parquet.hadoop.ParquetFileWriter;
> import org.apache.parquet.hadoop.ParquetWriter;
> import org.apache.parquet.hadoop.metadata.CompressionCodecName;
>
> import java.io.IOException;
>
> public class SlottedMeasurementsWriter implements
> Writer<SlottedMeasurements> {
>   private transient ParquetWriter<GenericRecord> parquetWriter;
>   private boolean overwrite;
>   private Path path;
>
>   public SlottedMeasurementsWriter(boolean overwrite) {
>     this.overwrite = overwrite;
>   }
>
>   @Override
>   public void open(FileSystem fs, Path path) throws IOException {
>     this.path = path;
>   }
>
>   @Override
>   public long flush() throws IOException {
>     return parquetWriter.getDataSize();
>   }
>
>   @Override
>   public long getPos() throws IOException {
>     return parquetWriter.getDataSize();
>   }
>
>   @Override
>   public void close() throws IOException {
>     parquetWriter.close();
>   }
>
>   @Override
>   public void write(SlottedMeasurements slot) throws IOException {
>
>     final AvroParquetWriter.Builder<GenericRecord> writerBuilder =
>       AvroParquetWriter
>         .<GenericRecord>*builder*(path)
>         .withSchema(slot.getMeasurements().get(0).getSchema())
>         .withCompressionCodec(CompressionCodecName.*UNCOMPRESSED*)
>         .withDictionaryEncoding(true)
>         .withWriterVersion(ParquetProperties.WriterVersion.*PARQUET_1_0*);
>     if (overwrite) {
>       writerBuilder.withWriteMode(ParquetFileWriter.Mode.*OVERWRITE*);
>     }
>
>     parquetWriter = writerBuilder.build();
>
>     for (GenericRecord measurement : slot.getMeasurements()) {
>       parquetWriter.write(measurement);
>     }
>   }
>
>
>   @Override
>   public Writer<SlottedMeasurements> duplicate() {
>     return new SlottedMeasurementsWriter(this.overwrite);
>   }
> }
>
>
>
>
>

Re: AvroParquetWriter may cause task managers to get lost

Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Ivan,
sure, the more work you do per record, the slower the sink will be. However, 
this should not influence (much) the liveness checks inside flink.
Do you get some meaningful entries in the TaskManagers' logs indicating the 
problem?

I'm no expert on Avro and don't know how much actual work it is to create such 
a writer, but from the code you gave:
- wouldn't your getPos() circumvent the BucketingSink's rolling file property? 
- similarly for flush() which may be dangerous during recovery (judging from 
its documentation - "returns the offset that the file must be truncated to at 
recovery")?


Nico

On Tuesday, 7 November 2017 19:51:35 CET Ivan Budincevic wrote:
> Hi all,
> 
> We recently implemented a feature in our streaming flink job in which we
> have a AvroParquetWriter which we build every time the overridden “write”
> method from org.apache.flink.streaming.connectors.fs.Writer gets called. We
> had to do this because the schema of each record is potentially different
> and we have to get the schema for the AvroParquetWriter out of the record
> itself first. Previously this builder was built only one time in the “open”
> method and from then only the write method was called per record.
 
> Since implementing this our job crashes with “Connection unexpectedly closed
> by remote task manager ‘internal company url’. This might indicate that the
> remote task manager was lost.”
 
> We did not run into any issues on our test environments, so we are
> suspecting this problem occurs only on higher loads as we have on our
> production environment. Unfortunately we still don’t have a proper means of
> reproducing this much load on our test environment to debug.
 
> Would having the AvroParquetWriter being built on every write be causing the
> problem and if so why would that be the case?
 
> Any help in getting to the bottom of the issue would be really appreciated.
> Bellow there is a code snippet of the class which uses the
> AvroParquetWriter.
 
> Best regards,
> Ivan Budincevic
> Software engineer, bol.com
> Netherlands
> 
> package com.bol.measure.timeblocks.files;
> 
> import com.bol.measure.timeblocks.measurement.SlottedMeasurements;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.flink.streaming.connectors.fs.Writer;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.apache.parquet.avro.AvroParquetWriter;
> import org.apache.parquet.column.ParquetProperties;
> import org.apache.parquet.hadoop.ParquetFileWriter;
> import org.apache.parquet.hadoop.ParquetWriter;
> import org.apache.parquet.hadoop.metadata.CompressionCodecName;
> 
> import java.io.IOException;
> 
> public class SlottedMeasurementsWriter implements
> Writer<SlottedMeasurements> {
 private transient
> ParquetWriter<GenericRecord> parquetWriter;
>   private boolean overwrite;
>   private Path path;
> 
>   public SlottedMeasurementsWriter(boolean overwrite) {
>     this.overwrite = overwrite;
>   }
> 
>   @Override
>   public void open(FileSystem fs, Path path) throws IOException {
>     this.path = path;
>   }
> 
>   @Override
>   public long flush() throws IOException {
>     return parquetWriter.getDataSize();
>   }
> 
>   @Override
>   public long getPos() throws IOException {
>     return parquetWriter.getDataSize();
>   }
> 
>   @Override
>   public void close() throws IOException {
>     parquetWriter.close();
>   }
> 
>   @Override
>   public void write(SlottedMeasurements slot) throws IOException {
> 
>     final AvroParquetWriter.Builder<GenericRecord> writerBuilder =
>       AvroParquetWriter
>         .<GenericRecord>builder(path)
>         .withSchema(slot.getMeasurements().get(0).getSchema())
>         .withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
>         .withDictionaryEncoding(true)
>         .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0);
>     if (overwrite) {
>       writerBuilder.withWriteMode(ParquetFileWriter.Mode.OVERWRITE);
>     }
> 
>     parquetWriter = writerBuilder.build();
> 
>     for (GenericRecord measurement : slot.getMeasurements()) {
>       parquetWriter.write(measurement);
>     }
>   }
> 
> 
>   @Override
>   public Writer<SlottedMeasurements> duplicate() {
>     return new SlottedMeasurementsWriter(this.overwrite);
>   }
> }
> 
>