You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/01 02:25:41 UTC
[5/6] beam git commit: Remove KeyedCombineFn
Remove KeyedCombineFn
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7e04924e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7e04924e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7e04924e
Branch: refs/heads/master
Commit: 7e04924ee7b31e28326f761618173749a55789d0
Parents: a198f8d
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Apr 21 14:04:02 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Apr 30 18:17:42 2017 -0700
----------------------------------------------------------------------
.../translation/utils/ApexStateInternals.java | 38 +-
.../runners/core/GlobalCombineFnRunner.java | 78 +++
.../runners/core/GlobalCombineFnRunners.java | 193 ++++++
.../runners/core/InMemoryStateInternals.java | 50 +-
.../runners/core/PerKeyCombineFnRunner.java | 79 ---
.../runners/core/PerKeyCombineFnRunners.java | 161 -----
.../org/apache/beam/runners/core/StateTag.java | 18 +-
.../org/apache/beam/runners/core/StateTags.java | 43 +-
.../beam/runners/core/SystemReduceFn.java | 15 +-
.../beam/runners/core/ReduceFnRunnerTest.java | 36 +-
.../beam/runners/core/ReduceFnTester.java | 15 +-
.../apache/beam/runners/core/StateTagTest.java | 22 +-
.../CopyOnAccessInMemoryStateInternals.java | 66 +-
.../CopyOnAccessInMemoryStateInternalsTest.java | 34 -
.../flink/FlinkBatchTransformTranslators.java | 9 +-
.../functions/AbstractFlinkCombineRunner.java | 44 +-
.../FlinkMergingNonShuffleReduceFunction.java | 10 +-
.../functions/FlinkPartialReduceFunction.java | 6 +-
.../functions/FlinkReduceFunction.java | 10 +-
.../functions/SortingFlinkCombineRunner.java | 1 -
.../state/FlinkBroadcastStateInternals.java | 173 ++---
.../state/FlinkKeyGroupStateInternals.java | 119 ++--
.../state/FlinkSplitStateInternals.java | 119 ++--
.../streaming/state/FlinkStateInternals.java | 173 ++---
.../spark/stateful/SparkStateInternals.java | 40 +-
.../spark/translation/SparkKeyedCombineFn.java | 26 +-
.../spark/translation/TransformTranslator.java | 44 +-
.../streaming/StreamingTransformTranslator.java | 4 +-
.../runners/spark/SparkRunnerDebuggerTest.java | 7 +-
.../src/main/resources/beam/findbugs-filter.xml | 2 +-
.../sdk/transforms/ApproximateQuantiles.java | 8 +-
.../beam/sdk/transforms/ApproximateUnique.java | 3 +-
.../org/apache/beam/sdk/transforms/Combine.java | 672 +++++--------------
.../beam/sdk/transforms/CombineFnBase.java | 136 ----
.../apache/beam/sdk/transforms/CombineFns.java | 448 +------------
.../beam/sdk/transforms/CombineWithContext.java | 174 +----
.../org/apache/beam/sdk/transforms/Top.java | 6 +-
.../org/apache/beam/sdk/transforms/View.java | 2 +-
.../apache/beam/sdk/util/AppliedCombineFn.java | 35 +-
.../org/apache/beam/sdk/util/CombineFnUtil.java | 123 ++--
.../apache/beam/sdk/util/state/StateBinder.java | 19 +-
.../apache/beam/sdk/util/state/StateSpecs.java | 177 ++---
.../beam/sdk/transforms/CombineFnsTest.java | 114 ++--
.../apache/beam/sdk/transforms/CombineTest.java | 213 +++---
.../apache/beam/sdk/transforms/ParDoTest.java | 2 +-
.../apache/beam/sdk/transforms/ViewTest.java | 2 +-
.../apache/beam/sdk/util/CombineFnUtilTest.java | 18 +-
47 files changed, 1178 insertions(+), 2609 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index ec8f666..e682894 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -42,8 +42,7 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CoderUtils;
@@ -145,7 +144,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
address,
accumCoder,
key,
- combineFn.<K>asKeyedFn()
+ combineFn
);
}
@@ -158,24 +157,11 @@ public class ApexStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValue(
+ bindCombiningValueWithContext(
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
- KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return new ApexCombiningState<>(
- namespace,
- address,
- accumCoder,
- key, combineFn);
- }
-
- @Override
- public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
- return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
}
}
@@ -323,12 +309,12 @@ public class ApexStateInternals<K> implements StateInternals<K> {
extends AbstractState<AccumT>
implements CombiningState<InputT, AccumT, OutputT> {
private final K key;
- private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
+ private final CombineFn<InputT, AccumT, OutputT> combineFn;
private ApexCombiningState(StateNamespace namespace,
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> coder,
- K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+ K key, CombineFn<InputT, AccumT, OutputT> combineFn) {
super(namespace, address, coder);
this.key = key;
this.combineFn = combineFn;
@@ -341,13 +327,13 @@ public class ApexStateInternals<K> implements StateInternals<K> {
@Override
public OutputT read() {
- return combineFn.extractOutput(key, getAccum());
+ return combineFn.extractOutput(getAccum());
}
@Override
public void add(InputT input) {
AccumT accum = getAccum();
- combineFn.addInput(key, accum, input);
+ combineFn.addInput(accum, input);
writeValue(accum);
}
@@ -355,7 +341,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
public AccumT getAccum() {
AccumT accum = readValue();
if (accum == null) {
- accum = combineFn.createAccumulator(key);
+ accum = combineFn.createAccumulator();
}
return accum;
}
@@ -376,13 +362,13 @@ public class ApexStateInternals<K> implements StateInternals<K> {
@Override
public void addAccum(AccumT accum) {
- accum = combineFn.mergeAccumulators(key, Arrays.asList(getAccum(), accum));
+ accum = combineFn.mergeAccumulators(Arrays.asList(getAccum(), accum));
writeValue(accum);
}
@Override
public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(key, accumulators);
+ return combineFn.mergeAccumulators(accumulators);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java
new file mode 100644
index 0000000..5325ba6
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.Serializable;
+import java.util.Collection;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.SideInputReader;
+
+/**
+ * An interface that runs a {@link GlobalCombineFn} with unified APIs.
+ *
+ * <p>Different combine functions have their own implementations. For example, the implementation
+ * can skip allocating {@code Combine.Context}, if the combine function doesn't use it.
+ */
+public interface GlobalCombineFnRunner<InputT, AccumT, OutputT> extends Serializable {
+ /**
+ * Forwards the call to a {@link GlobalCombineFn} to create the accumulator.
+ *
+ * <p>It constructs a {@code CombineWithContext.Context} from
+ * {@link PipelineOptions} and {@link SideInputReader} if it is required.
+ */
+ AccumT createAccumulator(PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
+
+ /**
+ * Forwards the call to a {@link GlobalCombineFn} to add the input.
+ *
+ * <p>It constructs a {@code CombineWithContext.Context} from
+ * {@link PipelineOptions} and {@link SideInputReader} if it is required.
+ */
+ AccumT addInput(AccumT accumulator, InputT value, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
+
+ /**
+ * Forwards the call to a {@link GlobalCombineFn} to merge accumulators.
+ *
+ * <p>It constructs a {@code CombineWithContext.Context} from
+ * {@link PipelineOptions} and {@link SideInputReader} if it is required.
+ */
+ AccumT mergeAccumulators(Iterable<AccumT> accumulators, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
+
+ /**
+ * Forwards the call to a {@link GlobalCombineFn} to extract the output.
+ *
+ * <p>It constructs a {@code CombineWithContext.Context} from
+ * {@link PipelineOptions} and {@link SideInputReader} if it is required.
+ */
+ OutputT extractOutput(AccumT accumulator, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
+
+ /**
+ * Forwards the call to a {@link GlobalCombineFn} to compact the accumulator.
+ *
+ * <p>It constructs a {@code CombineWithContext.Context} from
+ * {@link PipelineOptions} and {@link SideInputReader} if it is required.
+ */
+ AccumT compact(AccumT accumulator, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java
new file mode 100644
index 0000000..d45b503
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CombineContextFactory;
+import org.apache.beam.sdk.util.SideInputReader;
+
+/**
+ * Static utility methods that provide {@link GlobalCombineFnRunner} implementations for different
+ * combine functions.
+ */
+public class GlobalCombineFnRunners {
+ /** Returns a {@link GlobalCombineFnRunner} from a {@link GlobalCombineFn}. */
+ public static <InputT, AccumT, OutputT> GlobalCombineFnRunner<InputT, AccumT, OutputT> create(
+ GlobalCombineFn<InputT, AccumT, OutputT> globalCombineFn) {
+ if (globalCombineFn instanceof CombineFnWithContext) {
+ return new CombineFnWithContextRunner<>(
+ (CombineFnWithContext<InputT, AccumT, OutputT>) globalCombineFn);
+ } else if (globalCombineFn instanceof CombineFn) {
+ return new CombineFnRunner<>((CombineFn<InputT, AccumT, OutputT>) globalCombineFn);
+ } else {
+ throw new IllegalStateException(
+ String.format("Unknown type of CombineFn: %s", globalCombineFn.getClass()));
+ }
+ }
+
+ /**
+ * An implementation of {@link GlobalCombineFnRunner} with {@link CombineFn}.
+ *
+ * <p>It forwards functions calls to the {@link CombineFn}.
+ */
+ private static class CombineFnRunner<InputT, AccumT, OutputT>
+ implements org.apache.beam.runners.core.GlobalCombineFnRunner<InputT, AccumT, OutputT> {
+ private final CombineFn<InputT, AccumT, OutputT> combineFn;
+
+ private CombineFnRunner(CombineFn<InputT, AccumT, OutputT> combineFn) {
+ this.combineFn = combineFn;
+ }
+
+ @Override
+ public String toString() {
+ return combineFn.toString();
+ }
+
+ @Override
+ public AccumT createAccumulator(
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFn.createAccumulator();
+ }
+
+ @Override
+ public AccumT addInput(
+ AccumT accumulator,
+ InputT input,
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFn.addInput(accumulator, input);
+ }
+
+ @Override
+ public AccumT mergeAccumulators(
+ Iterable<AccumT> accumulators,
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFn.mergeAccumulators(accumulators);
+ }
+
+ @Override
+ public OutputT extractOutput(
+ AccumT accumulator,
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFn.extractOutput(accumulator);
+ }
+
+ @Override
+ public AccumT compact(
+ AccumT accumulator,
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFn.compact(accumulator);
+ }
+ }
+
+ /**
+ * An implementation of {@link org.apache.beam.runners.core.GlobalCombineFnRunner} with {@link
+ * CombineFnWithContext}.
+ *
+ * <p>It forwards functions calls to the {@link CombineFnWithContext}.
+ */
+ private static class CombineFnWithContextRunner<InputT, AccumT, OutputT>
+ implements org.apache.beam.runners.core.GlobalCombineFnRunner<InputT, AccumT, OutputT> {
+ private final CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext;
+
+ private CombineFnWithContextRunner(
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
+ this.combineFnWithContext = combineFnWithContext;
+ }
+
+ @Override
+ public String toString() {
+ return combineFnWithContext.toString();
+ }
+
+ @Override
+ public AccumT createAccumulator(
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFnWithContext.createAccumulator(
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+
+ @Override
+ public AccumT addInput(
+ AccumT accumulator,
+ InputT input,
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFnWithContext.addInput(
+ accumulator,
+ input,
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+
+ @Override
+ public AccumT mergeAccumulators(
+ Iterable<AccumT> accumulators,
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFnWithContext.mergeAccumulators(
+ accumulators,
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+
+ @Override
+ public OutputT extractOutput(
+ AccumT accumulator,
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFnWithContext.extractOutput(
+ accumulator,
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+
+ @Override
+ public AccumT compact(
+ AccumT accumulator,
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFnWithContext.compact(
+ accumulator,
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index 9fb8e3f..2c02282 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -31,8 +31,7 @@ import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
@@ -152,7 +151,7 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
final CombineFn<InputT, AccumT, OutputT> combineFn) {
- return new InMemoryCombiningState<K, InputT, AccumT, OutputT>(key, combineFn.<K>asKeyedFn());
+ return new InMemoryCombiningState<>(combineFn);
}
@Override
@@ -164,20 +163,11 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValue(
+ bindCombiningValueWithContext(
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
- KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return new InMemoryCombiningState<K, InputT, AccumT, OutputT>(key, combineFn);
- }
-
- @Override
- public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
- return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
}
}
@@ -310,23 +300,21 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
/**
* An {@link InMemoryState} implementation of {@link CombiningState}.
*/
- public static final class InMemoryCombiningState<K, InputT, AccumT, OutputT>
+ public static final class InMemoryCombiningState<InputT, AccumT, OutputT>
implements CombiningState<InputT, AccumT, OutputT>,
- InMemoryState<InMemoryCombiningState<K, InputT, AccumT, OutputT>> {
- private final K key;
+ InMemoryState<InMemoryCombiningState<InputT, AccumT, OutputT>> {
private boolean isCleared = true;
- private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
+ private final CombineFn<InputT, AccumT, OutputT> combineFn;
private AccumT accum;
public InMemoryCombiningState(
- K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- this.key = key;
+ CombineFn<InputT, AccumT, OutputT> combineFn) {
this.combineFn = combineFn;
- accum = combineFn.createAccumulator(key);
+ accum = combineFn.createAccumulator();
}
@Override
- public InMemoryCombiningState<K, InputT, AccumT, OutputT> readLater() {
+ public InMemoryCombiningState<InputT, AccumT, OutputT> readLater() {
return this;
}
@@ -334,19 +322,19 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
public void clear() {
// Even though we're clearing we can't remove this from the in-memory state map, since
// other users may already have a handle on this CombiningValue.
- accum = combineFn.createAccumulator(key);
+ accum = combineFn.createAccumulator();
isCleared = true;
}
@Override
public OutputT read() {
- return combineFn.extractOutput(key, accum);
+ return combineFn.extractOutput(accum);
}
@Override
public void add(InputT input) {
isCleared = false;
- accum = combineFn.addInput(key, accum, input);
+ accum = combineFn.addInput(accum, input);
}
@Override
@@ -371,12 +359,12 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
@Override
public void addAccum(AccumT accum) {
isCleared = false;
- this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, accum));
+ this.accum = combineFn.mergeAccumulators(Arrays.asList(this.accum, accum));
}
@Override
public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(key, accumulators);
+ return combineFn.mergeAccumulators(accumulators);
}
@Override
@@ -385,9 +373,9 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
}
@Override
- public InMemoryCombiningState<K, InputT, AccumT, OutputT> copy() {
- InMemoryCombiningState<K, InputT, AccumT, OutputT> that =
- new InMemoryCombiningState<>(key, combineFn);
+ public InMemoryCombiningState<InputT, AccumT, OutputT> copy() {
+ InMemoryCombiningState<InputT, AccumT, OutputT> that =
+ new InMemoryCombiningState<>(combineFn);
if (!this.isCleared) {
that.isCleared = this.isCleared;
that.addAccum(accum);
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
deleted file mode 100644
index a6608a7..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.io.Serializable;
-import java.util.Collection;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SideInputReader;
-
-/**
- * An interface that runs a {@link PerKeyCombineFn} with unified APIs.
- *
- * <p>Different keyed combine functions have their own implementations.
- * For example, the implementation can skip allocating {@code Combine.Context},
- * if the keyed combine function doesn't use it.
- */
-public interface PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> extends Serializable {
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from
- * {@link PipelineOptions} and {@link SideInputReader} if it is required.
- */
- AccumT createAccumulator(K key, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to add the input.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from
- * {@link PipelineOptions} and {@link SideInputReader} if it is required.
- */
- AccumT addInput(K key, AccumT accumulator, InputT value, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from
- * {@link PipelineOptions} and {@link SideInputReader} if it is required.
- */
- AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to extract the output.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from
- * {@link PipelineOptions} and {@link SideInputReader} if it is required.
- */
- OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to compact the accumulator.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from
- * {@link PipelineOptions} and {@link SideInputReader} if it is required.
- */
- AccumT compact(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
deleted file mode 100644
index 7736758..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.SideInputReader;
-
-/**
- * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations
- * for different keyed combine functions.
- */
-public class PerKeyCombineFnRunners {
- /**
- * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
- */
- public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
- create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
- if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
- return new KeyedCombineFnWithContextRunner<>(
- (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
- } else if (perKeyCombineFn instanceof KeyedCombineFn) {
- return new KeyedCombineFnRunner<>(
- (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
- } else {
- throw new IllegalStateException(
- String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass()));
- }
- }
-
- /**
- * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
- *
- * <p>It forwards functions calls to the {@link KeyedCombineFn}.
- */
- private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
- implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
- private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
-
- private KeyedCombineFnRunner(
- KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
- this.keyedCombineFn = keyedCombineFn;
- }
-
- @Override
- public String toString() {
- return keyedCombineFn.toString();
- }
-
- @Override
- public AccumT createAccumulator(K key, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.createAccumulator(key);
- }
-
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.addInput(key, accumulator, input);
- }
-
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.mergeAccumulators(key, accumulators);
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.extractOutput(key, accumulator);
- }
-
- @Override
- public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.compact(key, accumulator);
- }
- }
-
- /**
- * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
- *
- * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
- */
- private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
- implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
- private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
-
- private KeyedCombineFnWithContextRunner(
- KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
- this.keyedCombineFnWithContext = keyedCombineFnWithContext;
- }
-
- @Override
- public String toString() {
- return keyedCombineFnWithContext.toString();
- }
-
- @Override
- public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader,
- Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.createAccumulator(key,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.addInput(key, accumulator, input,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.mergeAccumulators(key, accumulators,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.extractOutput(key, accumulator,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.compact(key, accumulator,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
index a5d262a..aaeecf0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
@@ -23,8 +23,7 @@ import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
@@ -100,17 +99,10 @@ public interface StateTag<K, StateT extends State> extends Serializable {
CombineFn<InputT, AccumT, OutputT> combineFn);
<InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
- Coder<AccumT> accumCoder,
- KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
-
- <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT>
- combineFn);
+ CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+ Coder<AccumT> accumCoder,
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn);
/**
* Bind to a watermark {@link StateSpec}.
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index 2b3f4b8..fe99f27 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -26,8 +26,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.state.BagState;
@@ -90,22 +89,12 @@ public class StateTags {
@Override
public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombining(
+ CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(
String id,
StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
- KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return binder.bindKeyedCombiningValue(tagForSpec(id, spec), accumCoder, combineFn);
- }
-
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningWithContext(
- String id,
- StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
- return binder.bindKeyedCombiningValueWithContext(
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return binder.bindCombiningValueWithContext(
tagForSpec(id, spec), accumCoder, combineFn);
}
@@ -162,29 +151,17 @@ public class StateTags {
}
/**
- * Create a state tag for values that use a {@link KeyedCombineFn} to automatically merge
- * multiple {@code InputT}s into a single {@code OutputT}.
- */
- public static <K, InputT, AccumT,
- OutputT> StateTag<K, CombiningState<InputT, AccumT, OutputT>>
- keyedCombiningValue(String id, Coder<AccumT> accumCoder,
- KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- return new SimpleStateTag<>(
- new StructuredId(id), StateSpecs.keyedCombining(accumCoder, combineFn));
- }
-
- /**
- * Create a state tag for values that use a {@link KeyedCombineFnWithContext} to automatically
+ * Create a state tag for values that use a {@link CombineFnWithContext} to automatically
* merge multiple {@code InputT}s into a single {@code OutputT}.
*/
- public static <K, InputT, AccumT, OutputT>
- StateTag<K, CombiningState<InputT, AccumT, OutputT>>
- keyedCombiningValueWithContext(
+ public static <InputT, AccumT, OutputT>
+ StateTag<Object, CombiningState<InputT, AccumT, OutputT>>
+ combiningValueWithContext(
String id,
Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return new SimpleStateTag<>(
- new StructuredId(id), StateSpecs.keyedCombiningWithContext(accumCoder, combineFn));
+ new StructuredId(id), StateSpecs.combining(accumCoder, combineFn));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index f618d88..86a7fd7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -20,8 +20,7 @@ package org.apache.beam.runners.core;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.AppliedCombineFn;
@@ -71,18 +70,18 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound
AccumT, OutputT, W>
combining(
final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- final StateTag<K, CombiningState<InputT, AccumT, OutputT>> bufferTag;
- if (combineFn.getFn() instanceof KeyedCombineFnWithContext) {
+ final StateTag<Object, CombiningState<InputT, AccumT, OutputT>> bufferTag;
+ if (combineFn.getFn() instanceof CombineFnWithContext) {
bufferTag = StateTags.makeSystemTagInternal(
- StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValueWithContext(
+ StateTags.<InputT, AccumT, OutputT>combiningValueWithContext(
BUFFER_NAME, combineFn.getAccumulatorCoder(),
- (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) combineFn.getFn()));
+ (CombineFnWithContext<InputT, AccumT, OutputT>) combineFn.getFn()));
} else {
bufferTag = StateTags.makeSystemTagInternal(
- StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValue(
+ StateTags.<InputT, AccumT, OutputT>combiningValue(
BUFFER_NAME, combineFn.getAccumulatorCoder(),
- (KeyedCombineFn<K, InputT, AccumT, OutputT>) combineFn.getFn()));
+ (CombineFn<InputT, AccumT, OutputT>) combineFn.getFn()));
}
return new SystemReduceFn<K, InputT, AccumT, OutputT, W>(bufferTag) {
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 44bc538..ec2e7a3 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -218,7 +218,7 @@ public class ReduceFnRunnerTest {
ReduceFnTester.combining(
strategy,
mockTriggerStateMachine,
- Sum.ofIntegers().<String>asKeyedFn(),
+ Sum.ofIntegers(),
VarIntCoder.of());
injectElement(tester, 2);
@@ -291,7 +291,7 @@ public class ReduceFnRunnerTest {
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
ReduceFnTester
- .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
+ .combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
tester.injectElements(TimestampedValue.of(13, elementTimestamp));
@@ -323,7 +323,7 @@ public class ReduceFnRunnerTest {
ReduceFnTester.combining(
strategy,
mockTriggerStateMachine,
- Sum.ofIntegers().<String>asKeyedFn(),
+ Sum.ofIntegers(),
VarIntCoder.of());
injectElement(tester, 1);
@@ -387,9 +387,14 @@ public class ReduceFnRunnerTest {
});
SumAndVerifyContextFn combineFn = new SumAndVerifyContextFn(mockView, expectedValue);
- ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
- mainInputWindowingStrategy, mockTriggerStateMachine, combineFn.<String>asKeyedFn(),
- VarIntCoder.of(), options, mockSideInputReader);
+ ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+ ReduceFnTester.combining(
+ mainInputWindowingStrategy,
+ mockTriggerStateMachine,
+ combineFn,
+ VarIntCoder.of(),
+ options,
+ mockSideInputReader);
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
for (int i = 0; i < 8; ++i) {
@@ -1062,12 +1067,13 @@ public class ReduceFnRunnerTest {
*/
@Test
public void testDropDataMultipleWindowsFinishedTrigger() throws Exception {
- ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
- WindowingStrategy.of(
- SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30)))
- .withTrigger(AfterWatermark.pastEndOfWindow())
- .withAllowedLateness(Duration.millis(1000)),
- Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
+ ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+ ReduceFnTester.combining(
+ WindowingStrategy.of(SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30)))
+ .withTrigger(AfterWatermark.pastEndOfWindow())
+ .withAllowedLateness(Duration.millis(1000)),
+ Sum.ofIntegers(),
+ VarIntCoder.of());
tester.injectElements(
// assigned to [-60, 40), [-30, 70), [0, 100)
@@ -1209,8 +1215,7 @@ public class ReduceFnRunnerTest {
.withAllowedLateness(Duration.millis(100));
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
- ReduceFnTester
- .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
+ ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
tester.advanceInputWatermark(new Instant(0));
tester.advanceProcessingTime(new Instant(0));
@@ -1265,8 +1270,7 @@ public class ReduceFnRunnerTest {
.withAllowedLateness(Duration.millis(100));
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
- ReduceFnTester
- .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
+ ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
tester.advanceInputWatermark(new Instant(0));
tester.advanceProcessingTime(new Instant(0));
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index b5b5492..dfb769f 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -53,8 +53,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -170,13 +169,13 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
/**
* Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy} and
- * {@link KeyedCombineFn}, creating a {@link TriggerStateMachine} from the
+ * {@link CombineFn}, creating a {@link TriggerStateMachine} from the
* {@link Trigger} in the {@link WindowingStrategy}.
*/
public static <W extends BoundedWindow, AccumT, OutputT>
ReduceFnTester<Integer, OutputT, W> combining(
WindowingStrategy<?, W> strategy,
- KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn,
+ CombineFn<Integer, AccumT, OutputT> combineFn,
Coder<OutputT> outputCoder)
throws Exception {
@@ -194,7 +193,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
/**
* Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy},
- * {@link KeyedCombineFn}, and {@link TriggerStateMachine}, for mocking the interaction
+ * {@link CombineFn}, and {@link TriggerStateMachine}, for mocking the interaction
* between {@link ReduceFnRunner} and the {@link TriggerStateMachine}.
* Ignores the {@link Trigger} in the {@link WindowingStrategy}.
*/
@@ -202,7 +201,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
ReduceFnTester<Integer, OutputT, W> combining(
WindowingStrategy<?, W> strategy,
TriggerStateMachine triggerStateMachine,
- KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn,
+ CombineFn<Integer, AccumT, OutputT> combineFn,
Coder<OutputT> outputCoder)
throws Exception {
@@ -223,7 +222,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
public static <W extends BoundedWindow, AccumT, OutputT>
ReduceFnTester<Integer, OutputT, W> combining(
WindowingStrategy<?, W> strategy,
- KeyedCombineFnWithContext<String, Integer, AccumT, OutputT> combineFn,
+ CombineFnWithContext<Integer, AccumT, OutputT> combineFn,
Coder<OutputT> outputCoder,
PipelineOptions options,
SideInputReader sideInputReader)
@@ -246,7 +245,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
ReduceFnTester<Integer, OutputT, W> combining(
WindowingStrategy<?, W> strategy,
TriggerStateMachine triggerStateMachine,
- KeyedCombineFnWithContext<String, Integer, AccumT, OutputT> combineFn,
+ CombineFnWithContext<Integer, AccumT, OutputT> combineFn,
Coder<OutputT> outputCoder,
PipelineOptions options,
SideInputReader sideInputReader)
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
index 10dcb62..9a8b75c 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
@@ -162,17 +162,17 @@ public class StateTagTest {
Coder<int[]> accum1 = maxFn.getAccumulatorCoder(registry, VarIntCoder.of());
Coder<int[]> accum2 = minFn.getAccumulatorCoder(registry, BigEndianIntegerCoder.of());
- StateTag<?, ?> fooCoder1Max1 = StateTags.keyedCombiningValueWithContext(
- "foo", accum1, CombineFnUtil.toFnWithContext(maxFn).<String>asKeyedFn());
- StateTag<?, ?> fooCoder1Max2 = StateTags.keyedCombiningValueWithContext(
- "foo", accum1, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn());
- StateTag<?, ?> fooCoder1Min = StateTags.keyedCombiningValueWithContext(
- "foo", accum1, CombineFnUtil.toFnWithContext(minFn).asKeyedFn());
-
- StateTag<?, ?> fooCoder2Max = StateTags.keyedCombiningValueWithContext(
- "foo", accum2, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn());
- StateTag<?, ?> barCoder1Max = StateTags.keyedCombiningValueWithContext(
- "bar", accum1, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn());
+ StateTag<?, ?> fooCoder1Max1 = StateTags.combiningValueWithContext(
+ "foo", accum1, CombineFnUtil.toFnWithContext(maxFn));
+ StateTag<?, ?> fooCoder1Max2 = StateTags.combiningValueWithContext(
+ "foo", accum1, CombineFnUtil.toFnWithContext(maxFn));
+ StateTag<?, ?> fooCoder1Min = StateTags.combiningValueWithContext(
+ "foo", accum1, CombineFnUtil.toFnWithContext(minFn));
+
+ StateTag<?, ?> fooCoder2Max = StateTags.combiningValueWithContext(
+ "foo", accum2, CombineFnUtil.toFnWithContext(maxFn));
+ StateTag<?, ?> barCoder1Max = StateTags.combiningValueWithContext(
+ "bar", accum1, CombineFnUtil.toFnWithContext(maxFn));
// Same name, coder and combineFn
assertEquals(fooCoder1Max1, fooCoder1Max2);
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index 068b37f..92d87b5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -40,8 +40,7 @@ import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTag.StateBinder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
@@ -283,11 +282,10 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@SuppressWarnings("unchecked")
InMemoryState<? extends WatermarkHoldState> existingState =
(InMemoryState<? extends WatermarkHoldState>)
- underlying.get().get(namespace, address, c);
+ underlying.get().get(namespace, address, c);
return existingState.copy();
} else {
- return new InMemoryWatermarkHold<>(
- timestampCombiner);
+ return new InMemoryWatermarkHold<>(timestampCombiner);
}
}
@@ -298,7 +296,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@SuppressWarnings("unchecked")
InMemoryState<? extends ValueState<T>> existingState =
(InMemoryState<? extends ValueState<T>>)
- underlying.get().get(namespace, address, c);
+ underlying.get().get(namespace, address, c);
return existingState.copy();
} else {
return new InMemoryValue<>();
@@ -306,10 +304,11 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
}
@Override
- public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindCombiningValue(
+ public <InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
+ Coder<AccumT> accumCoder,
+ CombineFn<InputT, AccumT, OutputT> combineFn) {
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>> existingState =
@@ -317,8 +316,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
underlying.get().get(namespace, address, c);
return existingState.copy();
} else {
- return new InMemoryCombiningState<>(
- key, combineFn.asKeyedFn());
+ return new InMemoryCombiningState<>(combineFn);
}
}
@@ -329,7 +327,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@SuppressWarnings("unchecked")
InMemoryState<? extends BagState<T>> existingState =
(InMemoryState<? extends BagState<T>>)
- underlying.get().get(namespace, address, c);
+ underlying.get().get(namespace, address, c);
return existingState.copy();
} else {
return new InMemoryBag<>();
@@ -353,7 +351,8 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
StateTag<? super K, MapState<KeyT, ValueT>> address,
- Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
+ Coder<KeyT> mapKeyCoder,
+ Coder<ValueT> mapValueCoder) {
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
InMemoryState<? extends MapState<KeyT, ValueT>> existingState =
@@ -366,30 +365,12 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
}
@Override
- public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValue(
+ public <InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
- KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- if (containedInUnderlying(namespace, address)) {
- @SuppressWarnings("unchecked")
- InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>> existingState =
- (InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>>)
- underlying.get().get(namespace, address, c);
- return existingState.copy();
- } else {
- return new InMemoryCombiningState<>(key, combineFn);
- }
- }
-
- @Override
- public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
- return bindKeyedCombiningValue(
- address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
}
};
}
@@ -475,20 +456,11 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return underlying.get(namespace, address, c);
- }
-
- @Override
- public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValueWithContext(
+ bindCombiningValueWithContext(
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
- return bindKeyedCombiningValue(
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return bindCombiningValue(
address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
}
};
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index f0aeece..4d04745 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -40,7 +40,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
@@ -251,39 +250,6 @@ public class CopyOnAccessInMemoryStateInternalsTest {
}
@Test
- public void testKeyedAccumulatorCombiningStateWithUnderlying() throws Exception {
- CopyOnAccessInMemoryStateInternals<String> underlying =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- KeyedCombineFn<String, Long, long[], Long> sumLongFn = Sum.ofLongs().asKeyedFn();
-
- StateNamespace namespace = new StateNamespaceForTest("foo");
- CoderRegistry reg = pipeline.getCoderRegistry();
- StateTag<String, CombiningState<Long, long[], Long>> stateTag =
- StateTags.keyedCombiningValue(
- "summer",
- sumLongFn.getAccumulatorCoder(
- reg, StringUtf8Coder.of(), reg.getDefaultCoder(Long.class)),
- sumLongFn);
- GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
- assertThat(underlyingValue.read(), equalTo(0L));
-
- underlyingValue.add(1L);
- assertThat(underlyingValue.read(), equalTo(1L));
-
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
- GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
- assertThat(copyOnAccessState.read(), equalTo(1L));
-
- copyOnAccessState.add(4L);
- assertThat(copyOnAccessState.read(), equalTo(5L));
- assertThat(underlyingValue.read(), equalTo(1L));
-
- GroupingState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
- assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
- }
-
- @Test
public void testWatermarkHoldStateWithUnderlying() {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index 99de5be..6a7689a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -188,8 +188,7 @@ class FlinkBatchTransformTranslators {
DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
context.getInputDataSet(context.getInput(transform));
- Combine.KeyedCombineFn<K, InputT, List<InputT>, List<InputT>> combineFn =
- new Concatenate<InputT>().asKeyedFn();
+ Combine.CombineFn<InputT, List<InputT>, List<InputT>> combineFn = new Concatenate<>();
KvCoder<K, InputT> inputCoder =
(KvCoder<K, InputT>) context.getInput(transform).getCoder();
@@ -200,7 +199,6 @@ class FlinkBatchTransformTranslators {
accumulatorCoder =
combineFn.getAccumulatorCoder(
context.getInput(transform).getPipeline().getCoderRegistry(),
- inputCoder.getKeyCoder(),
inputCoder.getValueCoder());
} catch (CannotProvideCoderException e) {
throw new RuntimeException(e);
@@ -337,8 +335,8 @@ class FlinkBatchTransformTranslators {
DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
context.getInputDataSet(context.getInput(transform));
- CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn =
- (CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT>) transform.getFn();
+ CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn =
+ (CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT>) transform.getFn();
KvCoder<K, InputT> inputCoder =
(KvCoder<K, InputT>) context.getInput(transform).getCoder();
@@ -349,7 +347,6 @@ class FlinkBatchTransformTranslators {
accumulatorCoder =
combineFn.getAccumulatorCoder(
context.getInput(transform).getPipeline().getCoderRegistry(),
- inputCoder.getKeyCoder(),
inputCoder.getValueCoder());
} catch (CannotProvideCoderException e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
index 83ff70d..6e27057 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.flink.translation.functions;
import com.google.common.collect.ImmutableList;
import java.util.Collection;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.core.GlobalCombineFnRunner;
+import org.apache.beam.runners.core.GlobalCombineFnRunners;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -53,7 +53,7 @@ public abstract class AbstractFlinkCombineRunner<
Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception;
/**
- * Adapter interface that allows using a {@link CombineFnBase.PerKeyCombineFn} to either produce
+ * Adapter interface that allows using a {@link CombineFnBase.GlobalCombineFn} to either produce
* the {@code AccumT} as output or to combine several accumulators into an {@code OutputT}.
* The former would be used for a partial combine while the latter is used for the final merging
* of accumulators.
@@ -72,17 +72,17 @@ public abstract class AbstractFlinkCombineRunner<
}
/**
- * A straight wrapper of {@link CombineFnBase.PerKeyCombineFn} that takes in {@code InputT}
+ * A straight wrapper of {@link CombineFnBase.GlobalCombineFn} that takes in {@code InputT}
* and emits {@code OutputT}.
*/
public static class CompleteFlinkCombiner<K, InputT, AccumT, OutputT> implements
FlinkCombiner<K, InputT, AccumT, OutputT> {
- private final PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner;
+ private final GlobalCombineFnRunner<InputT, AccumT, OutputT> combineFnRunner;
public CompleteFlinkCombiner(
- CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- combineFnRunner = PerKeyCombineFnRunners.create(combineFn);
+ CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn) {
+ combineFnRunner = GlobalCombineFnRunners.create(combineFn);
}
@Override
@@ -90,22 +90,22 @@ public abstract class AbstractFlinkCombineRunner<
K key, InputT value, PipelineOptions options, SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows) {
AccumT accumulator =
- combineFnRunner.createAccumulator(key, options, sideInputReader, windows);
- return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows);
+ combineFnRunner.createAccumulator(options, sideInputReader, windows);
+ return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows);
}
@Override
public AccumT addInput(
K key, AccumT accumulator, InputT value, PipelineOptions options,
SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows);
+ return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows);
}
@Override
public OutputT extractOutput(
K key, AccumT accumulator, PipelineOptions options, SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows) {
- return combineFnRunner.extractOutput(key, accumulator, options, sideInputReader, windows);
+ return combineFnRunner.extractOutput(accumulator, options, sideInputReader, windows);
}
}
@@ -115,10 +115,10 @@ public abstract class AbstractFlinkCombineRunner<
public static class PartialFlinkCombiner<K, InputT, AccumT> implements
FlinkCombiner<K, InputT, AccumT, AccumT> {
- private final PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner;
+ private final GlobalCombineFnRunner<InputT, AccumT, ?> combineFnRunner;
- public PartialFlinkCombiner(CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn) {
- combineFnRunner = PerKeyCombineFnRunners.create(combineFn);
+ public PartialFlinkCombiner(CombineFnBase.GlobalCombineFn<InputT, AccumT, ?> combineFn) {
+ combineFnRunner = GlobalCombineFnRunners.create(combineFn);
}
@Override
@@ -126,15 +126,15 @@ public abstract class AbstractFlinkCombineRunner<
K key, InputT value, PipelineOptions options, SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows) {
AccumT accumulator =
- combineFnRunner.createAccumulator(key, options, sideInputReader, windows);
- return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows);
+ combineFnRunner.createAccumulator(options, sideInputReader, windows);
+ return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows);
}
@Override
public AccumT addInput(
K key, AccumT accumulator, InputT value, PipelineOptions options,
SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows);
+ return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows);
}
@Override
@@ -151,10 +151,10 @@ public abstract class AbstractFlinkCombineRunner<
public static class FinalFlinkCombiner<K, AccumT, OutputT> implements
FlinkCombiner<K, AccumT, AccumT, OutputT> {
- private final PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner;
+ private final GlobalCombineFnRunner<?, AccumT, OutputT> combineFnRunner;
- public FinalFlinkCombiner(CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn) {
- combineFnRunner = PerKeyCombineFnRunners.create(combineFn);
+ public FinalFlinkCombiner(CombineFnBase.GlobalCombineFn<?, AccumT, OutputT> combineFn) {
+ combineFnRunner = GlobalCombineFnRunners.create(combineFn);
}
@Override
@@ -169,14 +169,14 @@ public abstract class AbstractFlinkCombineRunner<
K key, AccumT accumulator, AccumT value, PipelineOptions options,
SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
return combineFnRunner.mergeAccumulators(
- key, ImmutableList.of(accumulator, value), options, sideInputReader, windows);
+ ImmutableList.of(accumulator, value), options, sideInputReader, windows);
}
@Override
public OutputT extractOutput(
K key, AccumT accumulator, PipelineOptions options, SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows) {
- return combineFnRunner.extractOutput(key, accumulator, options, sideInputReader, windows);
+ return combineFnRunner.extractOutput(accumulator, options, sideInputReader, windows);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index 3712598..9ccf079 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -41,7 +41,7 @@ public class FlinkMergingNonShuffleReduceFunction<
K, InputT, AccumT, OutputT, W extends BoundedWindow>
extends RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> {
- private final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn;
+ private final CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn;
private final WindowingStrategy<Object, W> windowingStrategy;
@@ -50,12 +50,12 @@ public class FlinkMergingNonShuffleReduceFunction<
private final SerializedPipelineOptions serializedOptions;
public FlinkMergingNonShuffleReduceFunction(
- CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn,
+ CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn,
WindowingStrategy<Object, W> windowingStrategy,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
PipelineOptions pipelineOptions) {
- this.combineFn = keyedCombineFn;
+ this.combineFn = combineFn;
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
@@ -75,7 +75,6 @@ public class FlinkMergingNonShuffleReduceFunction<
new FlinkSideInputReader(sideInputs, getRuntimeContext());
AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> reduceRunner;
-
if (windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
reduceRunner = new SortingFlinkCombineRunner<>();
} else {
@@ -83,13 +82,12 @@ public class FlinkMergingNonShuffleReduceFunction<
}
reduceRunner.combine(
- new AbstractFlinkCombineRunner.CompleteFlinkCombiner<>(combineFn),
+ new AbstractFlinkCombineRunner.CompleteFlinkCombiner<K, InputT, AccumT, OutputT>(combineFn),
windowingStrategy,
sideInputReader,
options,
elements,
out);
-
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index 9a44840..4099f52 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -42,7 +42,7 @@ import org.apache.flink.util.Collector;
public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWindow>
extends RichGroupCombineFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, AccumT>>> {
- protected final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn;
+ protected final CombineFnBase.GlobalCombineFn<InputT, AccumT, ?> combineFn;
protected final WindowingStrategy<Object, W> windowingStrategy;
@@ -51,7 +51,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
public FlinkPartialReduceFunction(
- CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn,
+ CombineFnBase.GlobalCombineFn<InputT, AccumT, ?> combineFn,
WindowingStrategy<Object, W> windowingStrategy,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
PipelineOptions pipelineOptions) {
@@ -83,7 +83,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
}
reduceRunner.combine(
- new AbstractFlinkCombineRunner.PartialFlinkCombiner<>(combineFn),
+ new AbstractFlinkCombineRunner.PartialFlinkCombiner<K, InputT, AccumT>(combineFn),
windowingStrategy,
sideInputReader,
options,
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index 6c1a2e4..90dcbff 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -42,7 +42,7 @@ import org.apache.flink.util.Collector;
public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
extends RichGroupReduceFunction<WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> {
- protected final CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn;
+ protected final CombineFnBase.GlobalCombineFn<?, AccumT, OutputT> combineFn;
protected final WindowingStrategy<Object, W> windowingStrategy;
@@ -51,12 +51,12 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
protected final SerializedPipelineOptions serializedOptions;
public FlinkReduceFunction(
- CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn,
+ CombineFnBase.GlobalCombineFn<?, AccumT, OutputT> combineFn,
WindowingStrategy<Object, W> windowingStrategy,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
PipelineOptions pipelineOptions) {
- this.combineFn = keyedCombineFn;
+ this.combineFn = combineFn;
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
@@ -83,15 +83,13 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
} else {
reduceRunner = new SortingFlinkCombineRunner<>();
}
-
reduceRunner.combine(
- new AbstractFlinkCombineRunner.FinalFlinkCombiner<>(combineFn),
+ new AbstractFlinkCombineRunner.FinalFlinkCombiner<K, AccumT, OutputT>(combineFn),
windowingStrategy,
sideInputReader,
options,
elements,
out);
-
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
index eac465c..4aacb4a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
@@ -43,7 +43,6 @@ import org.joda.time.Instant;
public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends BoundedWindow>
extends AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> {
-
@Override
public void combine(
FlinkCombiner<K, InputT, AccumT, OutputT> flinkCombiner,