You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Pei He (JIRA)" <ji...@apache.org> on 2016/03/04 23:01:40 UTC
[jira] [Created] (BEAM-96) Support composing combine functions
Pei He created BEAM-96:
--------------------------
Summary: Support composing combine functions
Key: BEAM-96
URL: https://issues.apache.org/jira/browse/BEAM-96
Project: Beam
Issue Type: New Feature
Components: sdk-java-core
Reporter: Pei He
Assignee: Davor Bonaci
The proposal of composed combine functions is following:
pc.apply(
Combine.perKey(
CombineFns.composeKeyed()
.with(identityFn, new MaxIntegerFn(), maxLatencyTag)
.with(identityFn, new MeanFn<Integer>(), meanLatencyTag)));
Example code:
* PCollection<KV<K, Integer>> latencies = ...;
*
* TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>();
* TupleTag<Double> meanLatencyTag = new TupleTag<Double>();
*
* SimpleFunction<Integer, Integer> identityFn =
* new SimpleFunction<Integer, Integer>() {
* @Override
* public Integer apply(Integer input) {
* return input;
* }};
* PCollection<KV<K, CoCombineResult>> maxAndMean = latencies.apply(
* Combine.perKey(
* CombineFns.composeKeyed()
* .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
* .with(identityFn, new MeanFn<Integer>(), meanLatencyTag)));
*
* PCollection<T> finalResultCollection = maxAndMean
* .apply(ParDo.of(
* new DoFn<KV<K, CoCombineResult>, T>() {
* @Override
* public void processElement(ProcessContext c) throws Exception {
* KV<K, CoCombineResult> e = c.element();
* Integer maxLatency = e.getValue().get(maxLatencyTag);
* Double meanLatency = e.getValue().get(meanLatencyTag);
* .... Do Something ....
* c.output(...some T...);
* }
* }));
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)