You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Davor Bonaci (JIRA)" <ji...@apache.org> on 2016/03/05 21:52:40 UTC

[jira] [Updated] (BEAM-96) Support composing combine functions

     [ https://issues.apache.org/jira/browse/BEAM-96?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Davor Bonaci updated BEAM-96:
-----------------------------
    Assignee: Pei He  (was: Davor Bonaci)

> Support composing combine functions
> -----------------------------------
>
>                 Key: BEAM-96
>                 URL: https://issues.apache.org/jira/browse/BEAM-96
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-core
>            Reporter: Pei He
>            Assignee: Pei He
>
> The proposal of composed combine functions is following:
> pc.apply(
>     Combine.perKey(
>          CombineFns.composeKeyed()
>             .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
>             .with(identityFn, new MeanFn<Integer>(), meanLatencyTag)));
> Example code:
>    * PCollection<KV<K, Integer>> latencies = ...;
>    *
>    * TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>();
>    * TupleTag<Double> meanLatencyTag = new TupleTag<Double>();
>    *
>    * SimpleFunction<Integer, Integer> identityFn =
>    *     new SimpleFunction<Integer, Integer>() {
>    *       @Override
>    *       public Integer apply(Integer input) {
>    *           return input;
>    *       }};
>    * PCollection<KV<K, CoCombineResult>> maxAndMean = latencies.apply(
>    *     Combine.perKey(
>    *         CombineFns.composeKeyed()
>    *            .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
>    *            .with(identityFn, new MeanFn<Integer>(), meanLatencyTag)));
>    *
>    * PCollection<T> finalResultCollection = maxAndMean
>    *     .apply(ParDo.of(
>    *         new DoFn<KV<K, CoCombineResult>, T>() {
>    *           @Override
>    *           public void processElement(ProcessContext c) throws Exception {
>    *             KV<K, CoCombineResult> e = c.element();
>    *             Integer maxLatency = e.getValue().get(maxLatencyTag);
>    *             Double meanLatency = e.getValue().get(meanLatencyTag);
>    *             .... Do Something ....
>    *             c.output(...some T...);
>    *           }
>    *         }));



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)