You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/08 20:40:38 UTC
[03/13] incubator-beam git commit: Port most of Combine to new DoFn
Port most of Combine to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/331f5234
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/331f5234
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/331f5234
Branch: refs/heads/master
Commit: 331f523461094af666a20bd97e1e15f1dec3feba
Parents: b1db02d
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:11:11 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/Combine.java | 20 ++++++++++----------
1 file changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/331f5234/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 6fc2324..a825800 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1473,9 +1473,9 @@ public class Combine {
PCollection<OutputT> defaultIfEmpty = maybeEmpty.getPipeline()
.apply("CreateVoid", Create.of((Void) null).withCoder(VoidCoder.of()))
.apply("ProduceDefault", ParDo.withSideInputs(maybeEmptyView).of(
- new OldDoFn<Void, OutputT>() {
- @Override
- public void processElement(OldDoFn<Void, OutputT>.ProcessContext c) {
+ new DoFn<Void, OutputT>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
Iterator<OutputT> combined = c.sideInput(maybeEmptyView).iterator();
if (!combined.hasNext()) {
c.output(defaultValue);
@@ -2097,15 +2097,15 @@ public class Combine {
final TupleTag<KV<KV<K, Integer>, InputT>> hot = new TupleTag<>();
final TupleTag<KV<K, InputT>> cold = new TupleTag<>();
PCollectionTuple split = input.apply("AddNonce", ParDo.of(
- new OldDoFn<KV<K, InputT>, KV<K, InputT>>() {
+ new DoFn<KV<K, InputT>, KV<K, InputT>>() {
transient int counter;
- @Override
+ @StartBundle
public void startBundle(Context c) {
counter = ThreadLocalRandom.current().nextInt(
Integer.MAX_VALUE);
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
KV<K, InputT> kv = c.element();
int spread = Math.max(1, hotKeyFanout.apply(kv.getKey()));
@@ -2135,9 +2135,9 @@ public class Combine {
.setWindowingStrategyInternal(preCombineStrategy)
.apply("PreCombineHot", Combine.perKey(hotPreCombine))
.apply("StripNonce", ParDo.of(
- new OldDoFn<KV<KV<K, Integer>, AccumT>,
+ new DoFn<KV<KV<K, Integer>, AccumT>,
KV<K, InputOrAccum<InputT, AccumT>>>() {
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(
c.element().getKey().getKey(),
@@ -2151,8 +2151,8 @@ public class Combine {
.get(cold)
.setCoder(inputCoder)
.apply("PrepareCold", ParDo.of(
- new OldDoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() {
- @Override
+ new DoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(c.element().getKey(),
InputOrAccum.<InputT, AccumT>input(c.element().getValue())));