You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tao Xia <ta...@udacity.com> on 2017/12/19 00:30:33 UTC

Pending parquet file with Bucking Sink

Hi All,
  Do you guys write parquet file using Bucking Sink? I run into an issue
with all the parquet files are in the pending status.  Any ideas?

processedStream is a DataStream of NDEvent.

Output files are all like this one "_part-0-0.pending"

val parquetSink = new BucketingSink[NDEvent]("/tmp/")
parquetSink.setBucketer(new DateTimeBucketer[NDEvent]("yyyy-MM-dd/HH"))
parquetSink.setWriter(new SinkParquetWriter(NDEvent.getClassSchema.toString))
processedStream.addSink(parquetSink)

public class SinkParquetWriter<T> implements Writer<T> {

    transient ParquetWriter writer = null;
    String schema = null;

    public SinkParquetWriter(String schema) {
        this.writer = writer;
        this.schema = schema;
    }

    public void open(FileSystem fileSystem, Path path) throws IOException {
        writer = AvroParquetWriter.builder(path)
                .withSchema(new Schema.Parser().parse(schema))
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .build();
    }

    public long flush() throws IOException {
        return writer.getDataSize();
    }

    public long getPos() throws IOException {
        return writer.getDataSize();
    }

    public void close() throws IOException {
        writer.close();
    }

    public void write(T t) throws IOException {
        writer.write(t);
    }

    public Writer<T> duplicate() {
        return new SinkParquetWriter<T>(schema);
    }
}


Thanks,
Tao

Re: Pending parquet file with Bucking Sink

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

Your analysis is correct. If the program ends before we can do a checkpoint files will never be moved to "final" state. We could move all files to "final" stage when the Sink is closing but the problem here is that Flink currently doesn't provide a way for user functions (which Sinks are) to distinguish between "erroneous close" and "close because of stream end" so we cannot currently do this. We are aware of the problem and this is the Jira Issue for tracking it: https://issues.apache.org/jira/browse/FLINK-2646 <https://issues.apache.org/jira/browse/FLINK-2646>

Best,
Aljoscha 

> On 20. Dec 2017, at 19:05, xiatao123 <ta...@udacity.com> wrote:
> 
> Hi Vipul,
>  Thanks for the information.  Yes, I do have checkpointing enabled with 10
> millisecs.
>  I think the issue here is that the stream ended before the checkpoint
> reached.  This is a testing code that the DataStream only have 5 events then
> it ended. Once the stream ended, the checkpoint is not triggered, then the
> file remains in "pending" state.
>  Anyway we can force a checkpoint trigger? or let the sink know the stream
> ended? 
> Thanks,
> Tao
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Pending parquet file with Bucking Sink

Posted by xiatao123 <ta...@udacity.com>.
Hi Vipul,
  Thanks for the information.  Yes, I do have checkpointing enabled with 10
millisecs.
  I think the issue here is that the stream ended before the checkpoint
reached.  This is a testing code that the DataStream only have 5 events then
it ended. Once the stream ended, the checkpoint is not triggered, then the
file remains in "pending" state.
  Anyway we can force a checkpoint trigger? or let the sink know the stream
ended? 
Thanks,
Tao



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Pending parquet file with Bucking Sink

Posted by vipul singh <ne...@gmail.com>.
Hi Tao,

Is checkpointing enabled in your app? The pending files should be moved to
non-pending files after checkpoint interval.

Please take a look at this link
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html>
:
"If checkpointing is not enabled the pending files will never be moved to
the finished state"

Thanks,
Vipul



On Mon, Dec 18, 2017 at 4:30 PM, Tao Xia <ta...@udacity.com> wrote:

> Hi All,
>   Do you guys write parquet file using Bucking Sink? I run into an issue
> with all the parquet files are in the pending status.  Any ideas?
>
> processedStream is a DataStream of NDEvent.
>
> Output files are all like this one "_part-0-0.pending"
>
> val parquetSink = new BucketingSink[NDEvent]("/tmp/")
> parquetSink.setBucketer(new DateTimeBucketer[NDEvent]("yyyy-MM-dd/HH"))
> parquetSink.setWriter(new SinkParquetWriter(NDEvent.getClassSchema.toString))
> processedStream.addSink(parquetSink)
>
> public class SinkParquetWriter<T> implements Writer<T> {
>
>     transient ParquetWriter writer = null;
>     String schema = null;
>
>     public SinkParquetWriter(String schema) {
>         this.writer = writer;
>         this.schema = schema;
>     }
>
>     public void open(FileSystem fileSystem, Path path) throws IOException {
>         writer = AvroParquetWriter.builder(path)
>                 .withSchema(new Schema.Parser().parse(schema))
>                 .withCompressionCodec(CompressionCodecName.SNAPPY)
>                 .build();
>     }
>
>     public long flush() throws IOException {
>         return writer.getDataSize();
>     }
>
>     public long getPos() throws IOException {
>         return writer.getDataSize();
>     }
>
>     public void close() throws IOException {
>         writer.close();
>     }
>
>     public void write(T t) throws IOException {
>         writer.write(t);
>     }
>
>     public Writer<T> duplicate() {
>         return new SinkParquetWriter<T>(schema);
>     }
> }
>
>
> Thanks,
> Tao
>



-- 
Thanks,
Vipul