You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by amran dean <ad...@gmail.com> on 2019/10/22 02:10:24 UTC

Issue with BulkWriter

Hello,
I'm using BulkWriter to write newline-delimited, LZO-compressed files. The
logic is very straightforward (See code below).

I am experiencing an issue decompressing the created files created in this
manner, consistently getting "lzop: unexpected end of file". Is this an
issue with caller of BulkWriter?

(As an aside), using com.hadoop.compression.lzo.LzoCodec instead results in
gibberish. I'm very confused what is going on.

private final CompressionOutputStream compressedStream;

public BulkRecordLZOSerializer(OutputStream stream) {
    CompressionCodecFactory factory = new CompressionCodecFactory(new
Configuration());
    try {
        compressedStream =
factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
    } catch (IOException e) {
        throw new IllegalStateException("Unable to create LZO OutputStream");
    }
}

public void addElement(KafkaRecord record) throws IOException {
    compressedStream.write(record.getValue());
    compressedStream.write('\n');
}

public void finish() throws IOException {
    compressedStream.finish();
}

public void flush() throws IOException {
    compressedStream.flush();
}

Re: Issue with BulkWriter

Posted by Ravi Bhushan Ratnakar <ra...@gmail.com>.
Hi,

If possible, kindly share one output file to inspect, in the meanwhile you
could also give a try with "org.apache.hadoop.io.compress.GzipCodec"

Regards,
Ravi

On Tue, Oct 22, 2019 at 7:25 PM amran dean <ad...@gmail.com> wrote:

>
> Hello,
> These changes result in the following error:
> $ lzop -d part-1-0
> lzop: part-1-0: not a lzop file
>
>
> public class BulkRecordLZOSerializer implements BulkWriter<KafkaRecord> {
>
>     private final CompressionOutputStream compressedStream;
>
>     public BulkRecordLZOSerializer(OutputStream stream) {
>         CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
>         try {
>             compressedStream = factory.getCodecByClassName("com.hadoop.compression.lzo.LzoCodec").createOutputStream(stream);
>         } catch (IOException e) {
>             throw new IllegalStateException("Unable to create LZO OutputStream");
>         }
>     }
>
>     public void addElement(KafkaRecord record) throws IOException {
>         compressedStream.write(record.getValue());
>         compressedStream.write('\n');
>     }
>
>     public void finish() throws IOException {
>         compressedStream.flush();
>         compressedStream.finish();
>     }
>
>     public void flush() throws IOException {
>         compressedStream.flush();
>     }
> }
>
>
> On Mon, Oct 21, 2019 at 11:17 PM Ravi Bhushan Ratnakar <
> ravibhushanratnakar@gmail.com> wrote:
>
>> Hi,
>>
>> Seems like that you want to use "com.hadoop.compression.lzo.LzoCodec"
>> instead of "com.hadoop.compression.lzo.*LzopCodec*" in the below line.
>>
>> compressedStream = factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
>>
>>
>> Regarding "lzop: unexpected end of file" problem, kindly add
>> "compressedStream.flush()" in the below method to flush any leftover data
>> before finishing.
>>
>> public void finish() throws IOException {
>>   compressedStream.flush();
>>   compressedStream.finish();
>> }
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/serialization/BulkWriter.html#finish--
>>
>> Regards,
>> Ravi
>>
>> On Tue, Oct 22, 2019 at 4:10 AM amran dean <ad...@gmail.com>
>> wrote:
>>
>>> Hello,
>>> I'm using BulkWriter to write newline-delimited, LZO-compressed files.
>>> The logic is very straightforward (See code below).
>>>
>>> I am experiencing an issue decompressing the created files created in
>>> this manner, consistently getting "lzop: unexpected end of file". Is this
>>> an issue with caller of BulkWriter?
>>>
>>> (As an aside), using com.hadoop.compression.lzo.LzoCodec instead results
>>> in gibberish. I'm very confused what is going on.
>>>
>>> private final CompressionOutputStream compressedStream;
>>>
>>> public BulkRecordLZOSerializer(OutputStream stream) {
>>>     CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
>>>     try {
>>>         compressedStream = factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
>>>     } catch (IOException e) {
>>>         throw new IllegalStateException("Unable to create LZO OutputStream");
>>>     }
>>> }
>>>
>>> public void addElement(KafkaRecord record) throws IOException {
>>>     compressedStream.write(record.getValue());
>>>     compressedStream.write('\n');
>>> }
>>>
>>> public void finish() throws IOException {
>>>     compressedStream.finish();
>>> }
>>>
>>> public void flush() throws IOException {
>>>     compressedStream.flush();
>>> }
>>>
>>>

Re: Issue with BulkWriter

Posted by amran dean <ad...@gmail.com>.
Hello,
These changes result in the following error:
$ lzop -d part-1-0
lzop: part-1-0: not a lzop file


public class BulkRecordLZOSerializer implements BulkWriter<KafkaRecord> {

    private final CompressionOutputStream compressedStream;

    public BulkRecordLZOSerializer(OutputStream stream) {
        CompressionCodecFactory factory = new
CompressionCodecFactory(new Configuration());
        try {
            compressedStream =
factory.getCodecByClassName("com.hadoop.compression.lzo.LzoCodec").createOutputStream(stream);
        } catch (IOException e) {
            throw new IllegalStateException("Unable to create LZO
OutputStream");
        }
    }

    public void addElement(KafkaRecord record) throws IOException {
        compressedStream.write(record.getValue());
        compressedStream.write('\n');
    }

    public void finish() throws IOException {
        compressedStream.flush();
        compressedStream.finish();
    }

    public void flush() throws IOException {
        compressedStream.flush();
    }
}


On Mon, Oct 21, 2019 at 11:17 PM Ravi Bhushan Ratnakar <
ravibhushanratnakar@gmail.com> wrote:

> Hi,
>
> Seems like that you want to use "com.hadoop.compression.lzo.LzoCodec"
> instead of "com.hadoop.compression.lzo.*LzopCodec*" in the below line.
>
> compressedStream = factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
>
>
> Regarding "lzop: unexpected end of file" problem, kindly add
> "compressedStream.flush()" in the below method to flush any leftover data
> before finishing.
>
> public void finish() throws IOException {
>   compressedStream.flush();
>   compressedStream.finish();
> }
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/serialization/BulkWriter.html#finish--
>
> Regards,
> Ravi
>
> On Tue, Oct 22, 2019 at 4:10 AM amran dean <ad...@gmail.com> wrote:
>
>> Hello,
>> I'm using BulkWriter to write newline-delimited, LZO-compressed files.
>> The logic is very straightforward (See code below).
>>
>> I am experiencing an issue decompressing the created files created in
>> this manner, consistently getting "lzop: unexpected end of file". Is this
>> an issue with caller of BulkWriter?
>>
>> (As an aside), using com.hadoop.compression.lzo.LzoCodec instead results
>> in gibberish. I'm very confused what is going on.
>>
>> private final CompressionOutputStream compressedStream;
>>
>> public BulkRecordLZOSerializer(OutputStream stream) {
>>     CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
>>     try {
>>         compressedStream = factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
>>     } catch (IOException e) {
>>         throw new IllegalStateException("Unable to create LZO OutputStream");
>>     }
>> }
>>
>> public void addElement(KafkaRecord record) throws IOException {
>>     compressedStream.write(record.getValue());
>>     compressedStream.write('\n');
>> }
>>
>> public void finish() throws IOException {
>>     compressedStream.finish();
>> }
>>
>> public void flush() throws IOException {
>>     compressedStream.flush();
>> }
>>
>>

Re: Issue with BulkWriter

Posted by Ravi Bhushan Ratnakar <ra...@gmail.com>.
Hi,

Seems like that you want to use "com.hadoop.compression.lzo.LzoCodec"
instead of "com.hadoop.compression.lzo.*LzopCodec*" in the below line.

compressedStream =
factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);


Regarding "lzop: unexpected end of file" problem, kindly add
"compressedStream.flush()" in the below method to flush any leftover data
before finishing.

public void finish() throws IOException {
  compressedStream.flush();
  compressedStream.finish();
}

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/serialization/BulkWriter.html#finish--

Regards,
Ravi

On Tue, Oct 22, 2019 at 4:10 AM amran dean <ad...@gmail.com> wrote:

> Hello,
> I'm using BulkWriter to write newline-delimited, LZO-compressed files. The
> logic is very straightforward (See code below).
>
> I am experiencing an issue decompressing the created files created in this
> manner, consistently getting "lzop: unexpected end of file". Is this an
> issue with caller of BulkWriter?
>
> (As an aside), using com.hadoop.compression.lzo.LzoCodec instead results
> in gibberish. I'm very confused what is going on.
>
> private final CompressionOutputStream compressedStream;
>
> public BulkRecordLZOSerializer(OutputStream stream) {
>     CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
>     try {
>         compressedStream = factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
>     } catch (IOException e) {
>         throw new IllegalStateException("Unable to create LZO OutputStream");
>     }
> }
>
> public void addElement(KafkaRecord record) throws IOException {
>     compressedStream.write(record.getValue());
>     compressedStream.write('\n');
> }
>
> public void finish() throws IOException {
>     compressedStream.finish();
> }
>
> public void flush() throws IOException {
>     compressedStream.flush();
> }
>
>