You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jesse Anderson <je...@smokinghand.com> on 2016/11/07 23:56:06 UTC

PCollection to PCollection Conversion

Is there a way to directly take a PCollection<KV> and make it a
PCollection<String>? I need to make the PCollection a PCollection<String>
before writing it out with TextIO.Write.

I tried using:
withCoder(KvCoder.of(StringDelegateCoder.of(String.class),
StringDelegateCoder.of(Long.class))

but that causes binary data to be written out by the KV coder.

The only way appears to be a manual transform with:
PCollection<String> stringCounts = counts.apply(MapElements
    .via((KV<String, Long> count) ->
    count.getKey() + ":" + count.getValue())
    .withOutputType(TypeDescriptors.strings()));

If this is missing, that manual step should be baked into the API. That
should be something either in StringDelegateCoder or a new String
transform. The new StringDelegateCoder method would take in any KV (or list
types) and put a specific String delimiter. The new transform would take in
any type in a PCollection<T> and makes it a PCollection<String> using a
specific String delimiter.

Thanks,

Jesse

Re: PCollection to PCollection Conversion

Posted by Dan Halperin <dh...@google.com>.
I think what you wrote is perfectly reasonable. Slightly more general,
maybe:

class KvToString<K,V> extends SimpleFunction<KV<K,V>, String> {
    public static String apply(KV<K, V> input) {
        return String.format("%s:%s", input.getKey(), input.getValue());
    }
}

Then this is reusable in MapElements across KVs of different types. But
it's effectively the same code you write.

We don't generally add a lot of utilities that replace 3-liners. I would,
however, be a fan of generic ToString, because that seems like it would be
used everywhere:

class ToStringFn<InputT> extends SimpleFunction<InputT, String> {
    public static String apply(InputT input) {
        return input.toString();
    }
}

or maybe a PTransform<PCollection<T>, PCollection<String>> that
internalized that SimpleFunction.

(caveat: I didn't actually compile or run any of that code, just typed it
in gmail :p).

Dan

On Mon, Nov 7, 2016 at 3:56 PM, Jesse Anderson <je...@smokinghand.com>
wrote:

> Is there a way to directly take a PCollection<KV> and make it a
> PCollection<String>? I need to make the PCollection a PCollection<String>
> before writing it out with TextIO.Write.
>
> I tried using:
> withCoder(KvCoder.of(StringDelegateCoder.of(String.class),
> StringDelegateCoder.of(Long.class))
>
> but that causes binary data to be written out by the KV coder.
>
> The only way appears to be a manual transform with:
> PCollection<String> stringCounts = counts.apply(MapElements
>     .via((KV<String, Long> count) ->
>     count.getKey() + ":" + count.getValue())
>     .withOutputType(TypeDescriptors.strings()));
>
> If this is missing, that manual step should be baked into the API. That
> should be something either in StringDelegateCoder or a new String
> transform. The new StringDelegateCoder method would take in any KV (or list
> types) and put a specific String delimiter. The new transform would take in
> any type in a PCollection<T> and makes it a PCollection<String> using a
> specific String delimiter.
>
> Thanks,
>
> Jesse
>

Re: PCollection to PCollection Conversion

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Good idea Jesse !

\u2063\u200b

On Nov 8, 2016, 14:45, at 14:45, Jesse Anderson <je...@smokinghand.com> wrote:
>Moving this thread to dev mailing list for discussion.
>
>On Tue, Nov 8, 2016 at 1:24 AM Jean-Baptiste Onofr� <jb...@nanthrax.net>
>wrote:
>
>> Hi Jesse,
>>
>> Coder is not for type conversion, but for serialization.
>>
>> I'm using the same as you:
>>
>>
>>
>https://github.com/jbonofre/beam-samples/blob/master/EventsByLocation/src/main/java/org/apache/beam/samples/EventsByLocation.java#L111
>>
>> with a SimpleFunction (that I can reuse in different MapElements
>call).
>>
>> I had the same need as you in different situation (like having
>> PCollection<Foo> and I want PCollection<String> just calling
>toString()
>> on Foo). I think it could be helpful to have TypeConverter like we
>have
>> in Apache Camel.
>> A list of TypeConverter (implicit) can be present in the Pipeline
>> context as something like:
>>
>> Element Source Type -> Element Target Type -> TypeConverter
>>
>> (of course an user could add his own type converter with a
>source/target
>> type).
>>
>> Implicitly, when we have a PCollection<Source> and want a
>> PCollection<Target> the type converter can be called.
>>
>> A TypeConverter could be basically a PTransform.
>>
>> Just thinking loud ;)
>>
>> Regards
>> JB
>>
>> On 11/08/2016 12:56 AM, Jesse Anderson wrote:
>> > Is there a way to directly take a PCollection<KV> and make it a
>> > PCollection<String>? I need to make the PCollection a
>> > PCollection<String> before writing it out with TextIO.Write.
>> >
>> > I tried using:
>> > withCoder(KvCoder.of(StringDelegateCoder.of(String.class),
>> > StringDelegateCoder.of(Long.class))
>> >
>> > but that causes binary data to be written out by the KV coder.
>> >
>> > The only way appears to be a manual transform with:
>> > PCollection<String> stringCounts = counts.apply(MapElements
>> >     .via((KV<String, Long> count) ->
>> >     count.getKey() + ":" + count.getValue())
>> >     .withOutputType(TypeDescriptors.strings()));
>> >
>> > If this is missing, that manual step should be baked into the API.
>That
>> > should be something either in StringDelegateCoder or a new String
>> > transform. The new StringDelegateCoder method would take in any KV
>(or
>> > list types) and put a specific String delimiter. The new transform
>would
>> > take in any type in a PCollection<T> and makes it a
>PCollection<String>
>> > using a specific String delimiter.
>> >
>> > Thanks,
>> >
>> > Jesse
>>
>> --
>> Jean-Baptiste Onofr�
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>

Re: PCollection to PCollection Conversion

Posted by Jesse Anderson <je...@smokinghand.com>.
Moving this thread to dev mailing list for discussion.

On Tue, Nov 8, 2016 at 1:24 AM Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:

> Hi Jesse,
>
> Coder is not for type conversion, but for serialization.
>
> I'm using the same as you:
>
>
> https://github.com/jbonofre/beam-samples/blob/master/EventsByLocation/src/main/java/org/apache/beam/samples/EventsByLocation.java#L111
>
> with a SimpleFunction (that I can reuse in different MapElements call).
>
> I had the same need as you in different situation (like having
> PCollection<Foo> and I want PCollection<String> just calling toString()
> on Foo). I think it could be helpful to have TypeConverter like we have
> in Apache Camel.
> A list of TypeConverter (implicit) can be present in the Pipeline
> context as something like:
>
> Element Source Type -> Element Target Type -> TypeConverter
>
> (of course an user could add his own type converter with a source/target
> type).
>
> Implicitly, when we have a PCollection<Source> and want a
> PCollection<Target> the type converter can be called.
>
> A TypeConverter could be basically a PTransform.
>
> Just thinking loud ;)
>
> Regards
> JB
>
> On 11/08/2016 12:56 AM, Jesse Anderson wrote:
> > Is there a way to directly take a PCollection<KV> and make it a
> > PCollection<String>? I need to make the PCollection a
> > PCollection<String> before writing it out with TextIO.Write.
> >
> > I tried using:
> > withCoder(KvCoder.of(StringDelegateCoder.of(String.class),
> > StringDelegateCoder.of(Long.class))
> >
> > but that causes binary data to be written out by the KV coder.
> >
> > The only way appears to be a manual transform with:
> > PCollection<String> stringCounts = counts.apply(MapElements
> >     .via((KV<String, Long> count) ->
> >     count.getKey() + ":" + count.getValue())
> >     .withOutputType(TypeDescriptors.strings()));
> >
> > If this is missing, that manual step should be baked into the API. That
> > should be something either in StringDelegateCoder or a new String
> > transform. The new StringDelegateCoder method would take in any KV (or
> > list types) and put a specific String delimiter. The new transform would
> > take in any type in a PCollection<T> and makes it a PCollection<String>
> > using a specific String delimiter.
> >
> > Thanks,
> >
> > Jesse
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: PCollection to PCollection Conversion

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi Jesse,

Coder is not for type conversion, but for serialization.

I'm using the same as you:

https://github.com/jbonofre/beam-samples/blob/master/EventsByLocation/src/main/java/org/apache/beam/samples/EventsByLocation.java#L111

with a SimpleFunction (that I can reuse in different MapElements call).

I had the same need as you in different situation (like having 
PCollection<Foo> and I want PCollection<String> just calling toString() 
on Foo). I think it could be helpful to have TypeConverter like we have 
in Apache Camel.
A list of TypeConverter (implicit) can be present in the Pipeline 
context as something like:

Element Source Type -> Element Target Type -> TypeConverter

(of course an user could add his own type converter with a source/target 
type).

Implicitly, when we have a PCollection<Source> and want a 
PCollection<Target> the type converter can be called.

A TypeConverter could be basically a PTransform.

Just thinking loud ;)

Regards
JB

On 11/08/2016 12:56 AM, Jesse Anderson wrote:
> Is there a way to directly take a PCollection<KV> and make it a
> PCollection<String>? I need to make the PCollection a
> PCollection<String> before writing it out with TextIO.Write.
>
> I tried using:
> withCoder(KvCoder.of(StringDelegateCoder.of(String.class),
> StringDelegateCoder.of(Long.class))
>
> but that causes binary data to be written out by the KV coder.
>
> The only way appears to be a manual transform with:
> PCollection<String> stringCounts = counts.apply(MapElements
>     .via((KV<String, Long> count) ->
>     count.getKey() + ":" + count.getValue())
>     .withOutputType(TypeDescriptors.strings()));
>
> If this is missing, that manual step should be baked into the API. That
> should be something either in StringDelegateCoder or a new String
> transform. The new StringDelegateCoder method would take in any KV (or
> list types) and put a specific String delimiter. The new transform would
> take in any type in a PCollection<T> and makes it a PCollection<String>
> using a specific String delimiter.
>
> Thanks,
>
> Jesse

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com