You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Liu Bo <di...@gmail.com> on 2019/07/27 03:07:10 UTC

How to write value only using flink's SequenceFileWriter?

Dear flink users,

We're trying to switch from StringWriter to SequenceFileWriter to turn on
compression. StringWriter writes value only and we want to keep that way.

AFAIK, you can use NullWritable in Hadoop writers to escape key so you only
write the values.

So I tried with NullWritable as following code:

   BucketingSink<Tuple2<NullWritable, Text>> hdfsSink = new BucketingSink(
"/data/cjv");

  hdfsSink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd/HH", ZoneOffset.UTC));
  hdfsSink.setWriter(new SequenceFileWriter<NullWritable,
Text>("org.apache.hadoop.io.compress.SnappyCodec",
SequenceFile.CompressionType.BLOCK));
  hdfsSink.setBatchSize(1024 * 1024 * 250);
  hdfsSink.setBatchRolloverInterval(20 * 60 * 1000);


   joinedResults.map(new MapFunction<Tuple2<String, String>,
Tuple2<NullWritable, Text>>() {

    @Override
    public Tuple2<NullWritable, Text> map(Tuple2<String, String>
value) throws Exception {
        return Tuple2.of(NullWritable.get(), new Text(value.f1));
    }
}).addSink(hdfsSink).name("hdfs_sink").uid("hdfs_sink");


But out put file has key as string value (null)

eg:

    (null)	{"ts":1564168038,"os":"android",...}


So my question is how to escape the key completely and write value
only in SequenceFileWriter?

Your help will be much of my appreciation.


-- 
All the best

Liu Bo

Re:Re: How to write value only using flink's SequenceFileWriter?

Posted by Haibo Sun <su...@163.com>.
Hi Liu Bo,


If you haven't customize serializations through the configuration item "io.serializations", the default serializer for Writable objects is org.apache.hadoop.io.serializer.WritableSerialization.WritableSerializer. As you said, when WritableSerializer serialize the NullWritable object, it doesn't actually write anything. So I suspect that "(null)" you saw may be part of the value, not the key.




Best,
Haibo

At 2019-07-27 11:43:47, "Liu Bo" <di...@gmail.com> wrote:

The file header says key is NullWritable: 
    SEQ^F!org.apache.hadoop.io.NullWritable^Yorg.apache.hadoop.io.Text^A^A)org.apache.hadoop.io.compress.SnappyCodec


Might be a hadoop -text problem?


On Sat, 27 Jul 2019 at 11:07, Liu Bo <di...@gmail.com> wrote:

Dear flink users, 


We're trying to switch from StringWriter to SequenceFileWriter to turn on compression. StringWriter writes value only and we want to keep that way.


AFAIK, you can use NullWritable in Hadoop writers to escape key so you only write the values. 


So I tried with NullWritable as following code:


   BucketingSink<Tuple2<NullWritable, Text>> hdfsSink = new BucketingSink("/data/cjv");
  hdfsSink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd/HH", ZoneOffset.UTC));
  hdfsSink.setWriter(new SequenceFileWriter<NullWritable, Text>("org.apache.hadoop.io.compress.SnappyCodec", SequenceFile.CompressionType.BLOCK));
  hdfsSink.setBatchSize(1024 * 1024 * 250);
  hdfsSink.setBatchRolloverInterval(20 * 60 * 1000);



   joinedResults.map(new MapFunction<Tuple2<String, String>, Tuple2<NullWritable, Text>>() {
@Override
public Tuple2<NullWritable, Text> map(Tuple2<String, String> value) throws Exception {
return Tuple2.of(NullWritable.get(), new Text(value.f1));
    }
}).addSink(hdfsSink).name("hdfs_sink").uid("hdfs_sink");


But out put file has key as string value (null)
eg:
    (null)	{"ts":1564168038,"os":"android",...}


So my question is how to escape the key completely and write value only in SequenceFileWriter?

Your help will be much of my appreciation.


--

All the best

Liu Bo




--

All the best

Liu Bo

Re: How to write value only using flink's SequenceFileWriter?

Posted by Liu Bo <di...@gmail.com>.
The file header says key is NullWritable:

SEQ^F!org.apache.hadoop.io.NullWritable^Yorg.apache.hadoop.io.Text^A^A)org.apache.hadoop.io.compress.SnappyCodec

Might be a hadoop -text problem?

On Sat, 27 Jul 2019 at 11:07, Liu Bo <di...@gmail.com> wrote:

> Dear flink users,
>
> We're trying to switch from StringWriter to SequenceFileWriter to turn on
> compression. StringWriter writes value only and we want to keep that way.
>
> AFAIK, you can use NullWritable in Hadoop writers to escape key so you
> only write the values.
>
> So I tried with NullWritable as following code:
>
>    BucketingSink<Tuple2<NullWritable, Text>> hdfsSink = new BucketingSink(
> "/data/cjv");
>
>   hdfsSink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd/HH", ZoneOffset.UTC));
>   hdfsSink.setWriter(new SequenceFileWriter<NullWritable, Text>("org.apache.hadoop.io.compress.SnappyCodec", SequenceFile.CompressionType.BLOCK));
>   hdfsSink.setBatchSize(1024 * 1024 * 250);
>   hdfsSink.setBatchRolloverInterval(20 * 60 * 1000);
>
>
>    joinedResults.map(new MapFunction<Tuple2<String, String>,
> Tuple2<NullWritable, Text>>() {
>
>     @Override
>     public Tuple2<NullWritable, Text> map(Tuple2<String, String> value) throws Exception {
>         return Tuple2.of(NullWritable.get(), new Text(value.f1));
>     }
> }).addSink(hdfsSink).name("hdfs_sink").uid("hdfs_sink");
>
>
> But out put file has key as string value (null)
>
> eg:
>
>     (null)	{"ts":1564168038,"os":"android",...}
>
>
> So my question is how to escape the key completely and write value only in SequenceFileWriter?
>
> Your help will be much of my appreciation.
>
>
> --
> All the best
>
> Liu Bo
>


-- 
All the best

Liu Bo