You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Heejong Lee <he...@google.com> on 2020/01/02 23:31:14 UTC

Re: External transform API in Java SDK

If we pass in TypeDescriptor objects instead of Java type information for
the compiler, we could match the returning coders and the given type
descriptors at pipeline construction time. It would be helpful to prevent
pipeline failing by class cast exception in runners. I've create the jira
ticket: https://issues.apache.org/jira/browse/BEAM-9048

On Mon, Dec 30, 2019 at 10:27 AM Luke Cwik <lc...@google.com> wrote:

>
>
> On Mon, Dec 23, 2019 at 12:20 PM Heejong Lee <he...@google.com> wrote:
>
>>
>>
>> On Fri, Dec 20, 2019 at 11:38 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> What do side inputs look like?
>>>
>>
>> A user needs to first pass PCollections for side inputs into the external
>> transform in addition to ordinary input PCollections and define
>> PCollectionViews inside the external transform something like:
>>
>> PCollectionTuple pTuple =
>>     PCollectionTuple.of("main1", main1)
>>         .and("main2", main2)
>>         .and("side", side)
>>         .apply(External.of(...).withMultiOutputs());
>>
>> public static class TestTransform extends PTransform<PCollectionTuple, PCollectionTuple> {
>>   @Override
>>   public PCollectionTuple expand(PCollectionTuple input) {
>>     PCollectionView<String> sideView = input.<String>get("side").apply(View.asSingleton());
>>     PCollection<String> main =
>>         PCollectionList.<String>of(input.get("main1"))
>>             .and(input.get("main2"))
>>             .apply(Flatten.pCollections())
>>             .apply(
>>                 ParDo.of(
>>                         new DoFn<String, String>() {
>>                           @ProcessElement
>>                           public void processElement(
>>                               @Element String x,
>>                               OutputReceiver<String> out,
>>                               DoFn<String, String>.ProcessContext c) {
>>                             out.output(x + c.sideInput(sideView));
>>                           }
>>                         })
>>                     .withSideInputs(sideView));
>>
>>
>>
>>> On Thu, Dec 19, 2019 at 4:39 PM Heejong Lee <he...@google.com> wrote:
>>>
>>>> I wanted to know if anybody has any comment on external transform API
>>>> for Java SDK.
>>>>
>>>> `External.of()` can create external transform for Java SDK. Depending
>>>> on input and output types, two additional methods are provided:
>>>> `withMultiOutputs()` which specifies the type of PCollection and
>>>> `withOutputType()` which specifies the type of output element. Some
>>>> examples are:
>>>>
>>>> PCollection<String> col =
>>>>     testPipeline
>>>>         .apply(Create.of("1", "2", "3"))
>>>>         .apply(External.of(*...*));
>>>>
>>>> This is okay without additional methods since 1) input and output types
>>>> of external transform can be inferred 2) output PCollection is singular.
>>>>
>>>
>>> How does the type/coder at runtime get inferred (doesn't java's type
>>> erasure get rid of this information)?
>>>
>>
>>>
>>>> PCollectionTuple pTuple =
>>>>     testPipeline
>>>>         .apply(Create.of(1, 2, 3, 4, 5, 6))
>>>>         .apply(
>>>>             External.of(*...*).withMultiOutputs());
>>>>
>>>> This requires `withMultiOutputs()` since output PCollection is
>>>> PCollectionTuple.
>>>>
>>>
>>> Shouldn't this require a mapping from "output" name to coder/type
>>> variable to be specified as an argument to withMultiOutputs?
>>>
>>>
>>>> PCollection<String> pCol =
>>>>     testPipeline
>>>>         .apply(Create.of("1", "2", "2", "3", "3", "3"))
>>>>         .apply(
>>>>             External.of(...)
>>>>                 .<KV<String, Long>>withOutputType())
>>>>         .apply(
>>>>             "toString",
>>>>             MapElements.into(TypeDescriptors.strings()).via(                x -> String.format("%s->%s", x.getKey(), x.getValue())));
>>>>
>>>>  This requires `withOutputType()` since the output element type cannot
>>>> be inferred from method chaining. I think some users may feel awkward to
>>>> call method only with the type parameter and empty parenthesis. Without
>>>> `withOutputType()`, the type of output element will be java.lang.Object
>>>> which might still be forcefully casted to KV.
>>>>
>>>
>>> How does the output type get preserved in this case (since Java's type
>>> erasure would remove <KV<String, Long>> after compilation and coder
>>> inference in my opinion should be broken and or choosing something generic
>>> like serializable)?
>>>
>>
>> The expansion service is responsible for using cross-language compatible
>> coders in the returning expanded transforms and these are the coders used
>> in the runtime. Type information annotated by additional methods here is
>> for compile-time type safety of external transforms.
>>
>
> Note that *.<KV<String, Long>>withOutputType()* could be changed to
> *.<String>withOutputType()* and we would get a *PCollection<String>*
> since *withOutputType* doesn't actually do anything at runtime and is
> just to make types align during compilation.
>
> Is there a way to ensure that the output type is actually compatible with
> the coder that was returned after expansion (this would likely require you
> to pass in typing information into *withOutputType*, see
> TypeDescriptors[1])?
>
> 1:
> https://github.com/apache/beam/blob/4c18cb4ada2650552a0006dfffd68d0775dd76c6/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
>
>
>>>
>> Thanks,
>>>> Heejong
>>>>
>>>