You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Davor Bonaci (JIRA)" <ji...@apache.org> on 2016/03/05 21:52:40 UTC
[jira] [Updated] (BEAM-96) Support composing combine functions
[ https://issues.apache.org/jira/browse/BEAM-96?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Davor Bonaci updated BEAM-96:
-----------------------------
Assignee: Pei He (was: Davor Bonaci)
> Support composing combine functions
> -----------------------------------
>
> Key: BEAM-96
> URL: https://issues.apache.org/jira/browse/BEAM-96
> Project: Beam
> Issue Type: New Feature
> Components: sdk-java-core
> Reporter: Pei He
> Assignee: Pei He
>
> The proposal of composed combine functions is following:
> pc.apply(
> Combine.perKey(
> CombineFns.composeKeyed()
> .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
> .with(identityFn, new MeanFn<Integer>(), meanLatencyTag)));
> Example code:
> * PCollection<KV<K, Integer>> latencies = ...;
> *
> * TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>();
> * TupleTag<Double> meanLatencyTag = new TupleTag<Double>();
> *
> * SimpleFunction<Integer, Integer> identityFn =
> * new SimpleFunction<Integer, Integer>() {
> * @Override
> * public Integer apply(Integer input) {
> * return input;
> * }};
> * PCollection<KV<K, CoCombineResult>> maxAndMean = latencies.apply(
> * Combine.perKey(
> * CombineFns.composeKeyed()
> * .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
> * .with(identityFn, new MeanFn<Integer>(), meanLatencyTag)));
> *
> * PCollection<T> finalResultCollection = maxAndMean
> * .apply(ParDo.of(
> * new DoFn<KV<K, CoCombineResult>, T>() {
> * @Override
> * public void processElement(ProcessContext c) throws Exception {
> * KV<K, CoCombineResult> e = c.element();
> * Integer maxLatency = e.getValue().get(maxLatencyTag);
> * Double meanLatency = e.getValue().get(meanLatencyTag);
> * .... Do Something ....
> * c.output(...some T...);
> * }
> * }));
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)