You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/01 02:25:37 UTC
[1/6] beam git commit: Update Dataflow worker version to
beam-master-20170430
Repository: beam
Updated Branches:
refs/heads/master a198f8d23 -> 9f2733ac4
Update Dataflow worker version to beam-master-20170430
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/07ca542f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/07ca542f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/07ca542f
Branch: refs/heads/master
Commit: 07ca542f282c2ce3aff9f3f2966fa80422a59242
Parents: 7e04924
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Apr 30 15:08:44 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Apr 30 18:17:42 2017 -0700
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/07ca542f/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index bfcb189..cb0fa7f 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -33,7 +33,7 @@
<packaging>jar</packaging>
<properties>
- <dataflow.container_version>beam-master-20170428-2</dataflow.container_version>
+ <dataflow.container_version>beam-master-20170430</dataflow.container_version>
<dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version>
<dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version>
</properties>
[4/6] beam git commit: Remove KeyedCombineFn
Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index d015c38..31e931c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -97,92 +97,74 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
StateTag<? super K, T> address,
final StateContext<?> context) {
- return address.bind(new StateTag.StateBinder<K>() {
+ return address.bind(
+ new StateTag.StateBinder<K>() {
- @Override
- public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address,
- Coder<T> coder) {
+ @Override
+ public <T> ValueState<T> bindValue(
+ StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
- return new FlinkBroadcastValueState<>(stateBackend, address, namespace, coder);
- }
+ return new FlinkBroadcastValueState<>(stateBackend, address, namespace, coder);
+ }
- @Override
- public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address,
- Coder<T> elemCoder) {
+ @Override
+ public <T> BagState<T> bindBag(
+ StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
- return new FlinkBroadcastBagState<>(stateBackend, address, namespace, elemCoder);
- }
-
- @Override
- public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address,
- Coder<T> elemCoder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", SetState.class.getSimpleName()));
- }
+ return new FlinkBroadcastBagState<>(stateBackend, address, namespace, elemCoder);
+ }
- @Override
- public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
- Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", MapState.class.getSimpleName()));
- }
+ @Override
+ public <T> SetState<T> bindSet(
+ StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", SetState.class.getSimpleName()));
+ }
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT>
- bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ @Override
+ public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+ StateTag<? super K, MapState<KeyT, ValueT>> spec,
+ Coder<KeyT> mapKeyCoder,
+ Coder<ValueT> mapValueCoder) {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", MapState.class.getSimpleName()));
+ }
- return new FlinkCombiningState<>(
- stateBackend, address, combineFn, namespace, accumCoder);
- }
+ @Override
+ public <InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return new FlinkKeyedCombiningState<>(
- stateBackend,
- address,
- combineFn,
- namespace,
- accumCoder,
- FlinkBroadcastStateInternals.this);
- }
+ return new FlinkCombiningState<>(
+ stateBackend, address, combineFn, namespace, accumCoder);
+ }
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- CombineWithContext.KeyedCombineFnWithContext<
- ? super K, InputT, AccumT, OutputT> combineFn) {
- return new FlinkCombiningStateWithContext<>(
- stateBackend,
- address,
- combineFn,
- namespace,
- accumCoder,
- FlinkBroadcastStateInternals.this,
- CombineContextFactory.createFromStateContext(context));
- }
+ @Override
+ public <InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder,
+ CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return new FlinkCombiningStateWithContext<>(
+ stateBackend,
+ address,
+ combineFn,
+ namespace,
+ accumCoder,
+ FlinkBroadcastStateInternals.this,
+ CombineContextFactory.createFromStateContext(context));
+ }
- @Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
- TimestampCombiner timestampCombiner) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", WatermarkHoldState.class.getSimpleName()));
- }
- });
+ @Override
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", WatermarkHoldState.class.getSimpleName()));
+ }
+ });
}
/**
@@ -587,13 +569,13 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
private final StateNamespace namespace;
private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
- private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
+ private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
private final FlinkBroadcastStateInternals<K> flinkStateInternals;
FlinkKeyedCombiningState(
DefaultOperatorStateBackend flinkStateBackend,
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
FlinkBroadcastStateInternals<K> flinkStateInternals) {
@@ -616,9 +598,9 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
try {
AccumT current = readInternal();
if (current == null) {
- current = combineFn.createAccumulator(flinkStateInternals.getKey());
+ current = combineFn.createAccumulator();
}
- current = combineFn.addInput(flinkStateInternals.getKey(), current, value);
+ current = combineFn.addInput(current, value);
writeInternal(current);
} catch (Exception e) {
throw new RuntimeException("Error adding to state." , e);
@@ -632,9 +614,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
if (current == null) {
writeInternal(accum);
} else {
- current = combineFn.mergeAccumulators(
- flinkStateInternals.getKey(),
- Arrays.asList(current, accum));
+ current = combineFn.mergeAccumulators(Arrays.asList(current, accum));
writeInternal(current);
}
} catch (Exception e) {
@@ -653,7 +633,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
@Override
public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators);
+ return combineFn.mergeAccumulators(accumulators);
}
@Override
@@ -661,11 +641,9 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
try {
AccumT accum = readInternal();
if (accum != null) {
- return combineFn.extractOutput(flinkStateInternals.getKey(), accum);
+ return combineFn.extractOutput(accum);
} else {
- return combineFn.extractOutput(
- flinkStateInternals.getKey(),
- combineFn.createAccumulator(flinkStateInternals.getKey()));
+ return combineFn.extractOutput(combineFn.createAccumulator());
}
} catch (Exception e) {
throw new RuntimeException("Error reading state.", e);
@@ -727,16 +705,14 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
private final StateNamespace namespace;
private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
- private final CombineWithContext.KeyedCombineFnWithContext<
- ? super K, InputT, AccumT, OutputT> combineFn;
+ private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
private final FlinkBroadcastStateInternals<K> flinkStateInternals;
private final CombineWithContext.Context context;
FlinkCombiningStateWithContext(
DefaultOperatorStateBackend flinkStateBackend,
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- CombineWithContext.KeyedCombineFnWithContext<
- ? super K, InputT, AccumT, OutputT> combineFn,
+ CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
FlinkBroadcastStateInternals<K> flinkStateInternals,
@@ -761,9 +737,9 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
try {
AccumT current = readInternal();
if (current == null) {
- current = combineFn.createAccumulator(flinkStateInternals.getKey(), context);
+ current = combineFn.createAccumulator(context);
}
- current = combineFn.addInput(flinkStateInternals.getKey(), current, value, context);
+ current = combineFn.addInput(current, value, context);
writeInternal(current);
} catch (Exception e) {
throw new RuntimeException("Error adding to state." , e);
@@ -778,10 +754,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
if (current == null) {
writeInternal(accum);
} else {
- current = combineFn.mergeAccumulators(
- flinkStateInternals.getKey(),
- Arrays.asList(current, accum),
- context);
+ current = combineFn.mergeAccumulators(Arrays.asList(current, accum), context);
writeInternal(current);
}
} catch (Exception e) {
@@ -800,14 +773,14 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
@Override
public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators, context);
+ return combineFn.mergeAccumulators(accumulators, context);
}
@Override
public OutputT read() {
try {
AccumT accum = readInternal();
- return combineFn.extractOutput(flinkStateInternals.getKey(), accum, context);
+ return combineFn.extractOutput(accum, context);
} catch (Exception e) {
throw new RuntimeException("Error reading state.", e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
index 2dd7c96..67d7966 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
@@ -120,79 +120,66 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
StateTag<? super K, T> address,
final StateContext<?> context) {
- return address.bind(new StateTag.StateBinder<K>() {
-
- @Override
- public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address,
- Coder<T> coder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", ValueState.class.getSimpleName()));
- }
+ return address.bind(
+ new StateTag.StateBinder<K>() {
+
+ @Override
+ public <T> ValueState<T> bindValue(
+ StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", ValueState.class.getSimpleName()));
+ }
- @Override
- public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address,
- Coder<T> elemCoder) {
+ @Override
+ public <T> BagState<T> bindBag(
+ StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
- return new FlinkKeyGroupBagState<>(address, namespace, elemCoder);
- }
-
- @Override
- public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address,
- Coder<T> elemCoder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", SetState.class.getSimpleName()));
- }
-
- @Override
- public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
- Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", MapState.class.getSimpleName()));
- }
+ return new FlinkKeyGroupBagState<>(address, namespace, elemCoder);
+ }
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT>
- bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
- throw new UnsupportedOperationException("bindCombiningValue is not supported.");
- }
+ @Override
+ public <T> SetState<T> bindSet(
+ StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", SetState.class.getSimpleName()));
+ }
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- throw new UnsupportedOperationException("bindKeyedCombiningValue is not supported.");
+ @Override
+ public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+ StateTag<? super K, MapState<KeyT, ValueT>> spec,
+ Coder<KeyT> mapKeyCoder,
+ Coder<ValueT> mapValueCoder) {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", MapState.class.getSimpleName()));
+ }
- }
+ @Override
+ public <InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ throw new UnsupportedOperationException("bindCombiningValue is not supported.");
+ }
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- CombineWithContext.KeyedCombineFnWithContext<
- ? super K, InputT, AccumT, OutputT> combineFn) {
- throw new UnsupportedOperationException(
- "bindKeyedCombiningValueWithContext is not supported.");
- }
+ @Override
+ public <InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder,
+ CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ throw new UnsupportedOperationException(
+ "bindCombiningValueWithContext is not supported.");
+ }
- @Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
- TimestampCombiner timestampCombiner) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", CombiningState.class.getSimpleName()));
- }
- });
+ @Override
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", CombiningState.class.getSimpleName()));
+ }
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
index 17ea62a..ef6c3b2 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -80,79 +80,66 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
StateTag<? super K, T> address,
final StateContext<?> context) {
- return address.bind(new StateTag.StateBinder<K>() {
-
- @Override
- public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address,
- Coder<T> coder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", ValueState.class.getSimpleName()));
- }
+ return address.bind(
+ new StateTag.StateBinder<K>() {
+
+ @Override
+ public <T> ValueState<T> bindValue(
+ StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", ValueState.class.getSimpleName()));
+ }
- @Override
- public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address,
- Coder<T> elemCoder) {
+ @Override
+ public <T> BagState<T> bindBag(
+ StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
- return new FlinkSplitBagState<>(stateBackend, address, namespace, elemCoder);
- }
-
- @Override
- public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address,
- Coder<T> elemCoder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", SetState.class.getSimpleName()));
- }
-
- @Override
- public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
- Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", MapState.class.getSimpleName()));
- }
+ return new FlinkSplitBagState<>(stateBackend, address, namespace, elemCoder);
+ }
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT>
- bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
- throw new UnsupportedOperationException("bindCombiningValue is not supported.");
- }
+ @Override
+ public <T> SetState<T> bindSet(
+ StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", SetState.class.getSimpleName()));
+ }
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- throw new UnsupportedOperationException("bindKeyedCombiningValue is not supported.");
+ @Override
+ public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+ StateTag<? super K, MapState<KeyT, ValueT>> spec,
+ Coder<KeyT> mapKeyCoder,
+ Coder<ValueT> mapValueCoder) {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", MapState.class.getSimpleName()));
+ }
- }
+ @Override
+ public <InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ throw new UnsupportedOperationException("bindCombiningValue is not supported.");
+ }
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- CombineWithContext.KeyedCombineFnWithContext<
- ? super K, InputT, AccumT, OutputT> combineFn) {
- throw new UnsupportedOperationException(
- "bindKeyedCombiningValueWithContext is not supported.");
- }
+ @Override
+ public <InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder,
+ CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ throw new UnsupportedOperationException(
+ "bindCombiningValueWithContext is not supported.");
+ }
- @Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
- TimestampCombiner timestampCombiner) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", CombiningState.class.getSimpleName()));
- }
- });
+ @Override
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", CombiningState.class.getSimpleName()));
+ }
+ });
}
private static class FlinkSplitBagState<K, T> implements BagState<T> {
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index 878c914..c99d085 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -106,93 +106,75 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
StateTag<? super K, T> address,
final StateContext<?> context) {
- return address.bind(new StateTag.StateBinder<K>() {
+ return address.bind(
+ new StateTag.StateBinder<K>() {
- @Override
- public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address,
- Coder<T> coder) {
+ @Override
+ public <T> ValueState<T> bindValue(
+ StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
- return new FlinkValueState<>(flinkStateBackend, address, namespace, coder);
- }
-
- @Override
- public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address,
- Coder<T> elemCoder) {
+ return new FlinkValueState<>(flinkStateBackend, address, namespace, coder);
+ }
- return new FlinkBagState<>(flinkStateBackend, address, namespace, elemCoder);
- }
+ @Override
+ public <T> BagState<T> bindBag(
+ StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
- @Override
- public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address,
- Coder<T> elemCoder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", SetState.class.getSimpleName()));
- }
+ return new FlinkBagState<>(flinkStateBackend, address, namespace, elemCoder);
+ }
- @Override
- public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
- Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", MapState.class.getSimpleName()));
- }
+ @Override
+ public <T> SetState<T> bindSet(
+ StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", SetState.class.getSimpleName()));
+ }
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT>
- bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ @Override
+ public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+ StateTag<? super K, MapState<KeyT, ValueT>> spec,
+ Coder<KeyT> mapKeyCoder,
+ Coder<ValueT> mapValueCoder) {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", MapState.class.getSimpleName()));
+ }
- return new FlinkCombiningState<>(
- flinkStateBackend, address, combineFn, namespace, accumCoder);
- }
+ @Override
+ public <InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return new FlinkKeyedCombiningState<>(
- flinkStateBackend,
- address,
- combineFn,
- namespace,
- accumCoder,
- FlinkStateInternals.this);
- }
+ return new FlinkCombiningState<>(
+ flinkStateBackend, address, combineFn, namespace, accumCoder);
+ }
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- CombineWithContext.KeyedCombineFnWithContext<
- ? super K, InputT, AccumT, OutputT> combineFn) {
- return new FlinkCombiningStateWithContext<>(
- flinkStateBackend,
- address,
- combineFn,
- namespace,
- accumCoder,
- FlinkStateInternals.this,
- CombineContextFactory.createFromStateContext(context));
- }
+ @Override
+ public <InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder,
+ CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return new FlinkCombiningStateWithContext<>(
+ flinkStateBackend,
+ address,
+ combineFn,
+ namespace,
+ accumCoder,
+ FlinkStateInternals.this,
+ CombineContextFactory.createFromStateContext(context));
+ }
- @Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
- TimestampCombiner timestampCombiner) {
+ @Override
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
- return new FlinkWatermarkHoldState<>(
- flinkStateBackend, FlinkStateInternals.this, address, namespace, timestampCombiner);
- }
- });
+ return new FlinkWatermarkHoldState<>(
+ flinkStateBackend, FlinkStateInternals.this, address, namespace, timestampCombiner);
+ }
+ });
}
private static class FlinkValueState<K, T> implements ValueState<T> {
@@ -566,7 +548,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
private final StateNamespace namespace;
private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
- private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
+ private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
private final FlinkStateInternals<K> flinkStateInternals;
@@ -574,7 +556,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
FlinkKeyedCombiningState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
FlinkStateInternals<K> flinkStateInternals) {
@@ -606,9 +588,9 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
AccumT current = state.value();
if (current == null) {
- current = combineFn.createAccumulator(flinkStateInternals.getKey());
+ current = combineFn.createAccumulator();
}
- current = combineFn.addInput(flinkStateInternals.getKey(), current, value);
+ current = combineFn.addInput(current, value);
state.update(current);
} catch (Exception e) {
throw new RuntimeException("Error adding to state." , e);
@@ -628,9 +610,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
if (current == null) {
state.update(accum);
} else {
- current = combineFn.mergeAccumulators(
- flinkStateInternals.getKey(),
- Lists.newArrayList(current, accum));
+ current = combineFn.mergeAccumulators(Lists.newArrayList(current, accum));
state.update(current);
}
} catch (Exception e) {
@@ -652,7 +632,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
@Override
public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators);
+ return combineFn.mergeAccumulators(accumulators);
}
@Override
@@ -666,11 +646,9 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
AccumT accum = state.value();
if (accum != null) {
- return combineFn.extractOutput(flinkStateInternals.getKey(), accum);
+ return combineFn.extractOutput(accum);
} else {
- return combineFn.extractOutput(
- flinkStateInternals.getKey(),
- combineFn.createAccumulator(flinkStateInternals.getKey()));
+ return combineFn.extractOutput(combineFn.createAccumulator());
}
} catch (Exception e) {
throw new RuntimeException("Error reading state.", e);
@@ -741,8 +719,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
private final StateNamespace namespace;
private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
- private final CombineWithContext.KeyedCombineFnWithContext<
- ? super K, InputT, AccumT, OutputT> combineFn;
+ private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
private final FlinkStateInternals<K> flinkStateInternals;
@@ -751,8 +728,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
FlinkCombiningStateWithContext(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- CombineWithContext.KeyedCombineFnWithContext<
- ? super K, InputT, AccumT, OutputT> combineFn,
+ CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
FlinkStateInternals<K> flinkStateInternals,
@@ -786,9 +762,9 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
AccumT current = state.value();
if (current == null) {
- current = combineFn.createAccumulator(flinkStateInternals.getKey(), context);
+ current = combineFn.createAccumulator(context);
}
- current = combineFn.addInput(flinkStateInternals.getKey(), current, value, context);
+ current = combineFn.addInput(current, value, context);
state.update(current);
} catch (Exception e) {
throw new RuntimeException("Error adding to state." , e);
@@ -808,10 +784,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
if (current == null) {
state.update(accum);
} else {
- current = combineFn.mergeAccumulators(
- flinkStateInternals.getKey(),
- Lists.newArrayList(current, accum),
- context);
+ current = combineFn.mergeAccumulators(Lists.newArrayList(current, accum), context);
state.update(current);
}
} catch (Exception e) {
@@ -833,7 +806,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
@Override
public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators, context);
+ return combineFn.mergeAccumulators(accumulators, context);
}
@Override
@@ -846,7 +819,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
flinkStateDescriptor);
AccumT accum = state.value();
- return combineFn.extractOutput(flinkStateInternals.getKey(), accum, context);
+ return combineFn.extractOutput(accum, context);
} catch (Exception e) {
throw new RuntimeException("Error reading state.", e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
index c967521..cdc23ff 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
@@ -31,8 +31,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
@@ -142,27 +141,17 @@ class SparkStateInternals<K> implements StateInternals<K> {
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineFn<InputT, AccumT, OutputT> combineFn) {
- return new SparkCombiningState<>(namespace, address, accumCoder, key,
- combineFn.<K>asKeyedFn());
+ return new SparkCombiningState<>(namespace, address, accumCoder, combineFn);
}
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValue(
+ bindCombiningValueWithContext(
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
- KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return new SparkCombiningState<>(namespace, address, accumCoder, key, combineFn);
- }
-
- @Override
- public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
- return new SparkCombiningState<>(namespace, address, accumCoder, key,
- CombineFnUtil.bindContext(combineFn, c));
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return new SparkCombiningState<>(
+ namespace, address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
}
@Override
@@ -307,17 +296,14 @@ class SparkStateInternals<K> implements StateInternals<K> {
extends AbstractState<AccumT>
implements CombiningState<InputT, AccumT, OutputT> {
- private final K key;
- private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
+ private final CombineFn<InputT, AccumT, OutputT> combineFn;
private SparkCombiningState(
StateNamespace namespace,
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> coder,
- K key,
- KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+ CombineFn<InputT, AccumT, OutputT> combineFn) {
super(namespace, address, coder);
- this.key = key;
this.combineFn = combineFn;
}
@@ -328,13 +314,13 @@ class SparkStateInternals<K> implements StateInternals<K> {
@Override
public OutputT read() {
- return combineFn.extractOutput(key, getAccum());
+ return combineFn.extractOutput(getAccum());
}
@Override
public void add(InputT input) {
AccumT accum = getAccum();
- combineFn.addInput(key, accum, input);
+ combineFn.addInput(accum, input);
writeValue(accum);
}
@@ -342,7 +328,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
public AccumT getAccum() {
AccumT accum = readValue();
if (accum == null) {
- accum = combineFn.createAccumulator(key);
+ accum = combineFn.createAccumulator();
}
return accum;
}
@@ -363,13 +349,13 @@ class SparkStateInternals<K> implements StateInternals<K> {
@Override
public void addAccum(AccumT accum) {
- accum = combineFn.mergeAccumulators(key, Arrays.asList(getAccum(), accum));
+ accum = combineFn.mergeAccumulators(Arrays.asList(getAccum(), accum));
writeValue(accum);
}
@Override
public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(key, accumulators);
+ return combineFn.mergeAccumulators(accumulators);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
index 66c03bc..58db8e4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
@@ -41,14 +41,14 @@ import org.joda.time.Instant;
/**
- * A {@link org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn}
+ * A {@link org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn}
* with a {@link org.apache.beam.sdk.transforms.CombineWithContext.Context} for the SparkRunner.
*/
public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstractCombineFn {
- private final CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
+ private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
public SparkKeyedCombineFn(
- CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn,
+ CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
SparkRuntimeContext runtimeContext,
Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
WindowingStrategy<?, ?> windowingStrategy) {
@@ -59,8 +59,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
/** Applying the combine function directly on a key's grouped values - post grouping. */
public OutputT apply(WindowedValue<KV<K, Iterable<InputT>>> windowedKv) {
// apply combine function on grouped values.
- return combineFn.apply(windowedKv.getValue().getKey(), windowedKv.getValue().getValue(),
- ctxtForInput(windowedKv));
+ return combineFn.apply(windowedKv.getValue().getValue(), ctxtForInput(windowedKv));
}
/**
@@ -83,8 +82,8 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
// first create the accumulator and accumulate first input.
K key = currentInput.getValue().getKey();
- AccumT accumulator = combineFn.createAccumulator(key, ctxtForInput(currentInput));
- accumulator = combineFn.addInput(key, accumulator, currentInput.getValue().getValue(),
+ AccumT accumulator = combineFn.createAccumulator(ctxtForInput(currentInput));
+ accumulator = combineFn.addInput(accumulator, currentInput.getValue().getValue(),
ctxtForInput(currentInput));
// keep track of the timestamps assigned by the TimestampCombiner.
@@ -114,7 +113,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
currentWindow = merge((IntervalWindow) currentWindow, (IntervalWindow) nextWindow);
}
// keep accumulating and carry on ;-)
- accumulator = combineFn.addInput(key, accumulator, nextValue.getValue().getValue(),
+ accumulator = combineFn.addInput(accumulator, nextValue.getValue().getValue(),
ctxtForInput(nextValue));
windowTimestamp =
timestampCombiner.combine(
@@ -128,8 +127,8 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
output.add(WindowedValue.of(KV.of(key, accumulator), windowTimestamp, currentWindow,
PaneInfo.NO_FIRING));
// re-init accumulator, window and timestamp.
- accumulator = combineFn.createAccumulator(key, ctxtForInput(nextValue));
- accumulator = combineFn.addInput(key, accumulator, nextValue.getValue().getValue(),
+ accumulator = combineFn.createAccumulator(ctxtForInput(nextValue));
+ accumulator = combineFn.addInput(accumulator, nextValue.getValue().getValue(),
ctxtForInput(nextValue));
currentWindow = nextWindow;
windowTimestamp =
@@ -233,7 +232,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
WindowedValue<KV<K, Iterable<AccumT>>> preMergeWindowedValue = WindowedValue.of(
KV.of(key, accumsToMerge), mergedTimestamp, currentWindow, PaneInfo.NO_FIRING);
// applying the actual combiner onto the accumulators.
- AccumT accumulated = combineFn.mergeAccumulators(key, accumsToMerge,
+ AccumT accumulated = combineFn.mergeAccumulators(accumsToMerge,
ctxtForInput(preMergeWindowedValue));
WindowedValue<KV<K, AccumT>> postMergeWindowedValue =
preMergeWindowedValue.withValue(KV.of(key, accumulated));
@@ -254,7 +253,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
Iterable<AccumT> accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators);
WindowedValue<KV<K, Iterable<AccumT>>> preMergeWindowedValue = WindowedValue.of(
KV.of(key, accumsToMerge), mergedTimestamp, currentWindow, PaneInfo.NO_FIRING);
- AccumT accumulated = combineFn.mergeAccumulators(key, accumsToMerge,
+ AccumT accumulated = combineFn.mergeAccumulators(accumsToMerge,
ctxtForInput(preMergeWindowedValue));
WindowedValue<KV<K, AccumT>> postMergeWindowedValue =
preMergeWindowedValue.withValue(KV.of(key, accumulated));
@@ -272,9 +271,8 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
if (wkva == null) {
return null;
}
- K key = wkva.getValue().getKey();
AccumT accumulator = wkva.getValue().getValue();
- return wkva.withValue(combineFn.extractOutput(key, accumulator, ctxtForInput(wkva)));
+ return wkva.withValue(combineFn.extractOutput(accumulator, ctxtForInput(wkva)));
}
});
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index c2a8b06..d249e78 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -29,7 +29,6 @@ import com.google.common.collect.Maps;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
@@ -165,8 +164,8 @@ public final class TransformTranslator {
Combine.GroupedValues<K, InputT, OutputT> transform,
EvaluationContext context) {
@SuppressWarnings("unchecked")
- CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT> combineFn =
- (CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT>)
+ CombineWithContext.CombineFnWithContext<InputT, ?, OutputT> combineFn =
+ (CombineWithContext.CombineFnWithContext<InputT, ?, OutputT>)
CombineFnUtil.toFnWithContext(transform.getFn());
final SparkKeyedCombineFn<K, InputT, ?, OutputT> sparkCombineFn =
new SparkKeyedCombineFn<>(combineFn, context.getRuntimeContext(),
@@ -282,16 +281,15 @@ public final class TransformTranslator {
return new TransformEvaluator<Combine.PerKey<K, InputT, OutputT>>() {
@Override
public void evaluate(
- Combine.PerKey<K, InputT, OutputT> transform,
- EvaluationContext context) {
+ Combine.PerKey<K, InputT, OutputT> transform, EvaluationContext context) {
final PCollection<KV<K, InputT>> input = context.getInput(transform);
// serializable arguments to pass.
@SuppressWarnings("unchecked")
final KvCoder<K, InputT> inputCoder =
(KvCoder<K, InputT>) context.getInput(transform).getCoder();
@SuppressWarnings("unchecked")
- final CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn =
- (CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>)
+ final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn =
+ (CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT>)
CombineFnUtil.toFnWithContext(transform.getFn());
final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
@@ -301,8 +299,9 @@ public final class TransformTranslator {
new SparkKeyedCombineFn<>(combineFn, runtimeContext, sideInputs, windowingStrategy);
final Coder<AccumT> vaCoder;
try {
- vaCoder = combineFn.getAccumulatorCoder(runtimeContext.getCoderRegistry(),
- inputCoder.getKeyCoder(), inputCoder.getValueCoder());
+ vaCoder =
+ combineFn.getAccumulatorCoder(
+ runtimeContext.getCoderRegistry(), inputCoder.getValueCoder());
} catch (CannotProvideCoderException e) {
throw new IllegalStateException("Could not determine coder for accumulator", e);
}
@@ -312,19 +311,28 @@ public final class TransformTranslator {
((BoundedDataset<KV<K, InputT>>) context.borrowDataset(transform)).getRDD();
JavaPairRDD<K, Iterable<WindowedValue<KV<K, AccumT>>>> accumulatePerKey =
- GroupCombineFunctions.combinePerKey(inRdd, sparkCombineFn, inputCoder.getKeyCoder(),
- inputCoder.getValueCoder(), vaCoder, windowingStrategy);
+ GroupCombineFunctions.combinePerKey(
+ inRdd,
+ sparkCombineFn,
+ inputCoder.getKeyCoder(),
+ inputCoder.getValueCoder(),
+ vaCoder,
+ windowingStrategy);
JavaRDD<WindowedValue<KV<K, OutputT>>> outRdd =
- accumulatePerKey.flatMapValues(new Function<Iterable<WindowedValue<KV<K, AccumT>>>,
- Iterable<WindowedValue<OutputT>>>() {
- @Override
- public Iterable<WindowedValue<OutputT>> call(
- Iterable<WindowedValue<KV<K, AccumT>>> iter) throws Exception {
+ accumulatePerKey
+ .flatMapValues(
+ new Function<
+ Iterable<WindowedValue<KV<K, AccumT>>>,
+ Iterable<WindowedValue<OutputT>>>() {
+ @Override
+ public Iterable<WindowedValue<OutputT>> call(
+ Iterable<WindowedValue<KV<K, AccumT>>> iter) throws Exception {
return sparkCombineFn.extractOutput(iter);
}
- }).map(TranslationUtils.<K, WindowedValue<OutputT>>fromPairFunction())
- .map(TranslationUtils.<K, OutputT>toKVByWindowInValue());
+ })
+ .map(TranslationUtils.<K, WindowedValue<OutputT>>fromPairFunction())
+ .map(TranslationUtils.<K, OutputT>toKVByWindowInValue());
context.putDataset(transform, new BoundedDataset<>(outRdd));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 26f0ade..9af4af2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -327,8 +327,8 @@ public final class StreamingTransformTranslator {
context.getInput(transform);
final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
@SuppressWarnings("unchecked")
- final CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT> fn =
- (CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT>)
+ final CombineWithContext.CombineFnWithContext<InputT, ?, OutputT> fn =
+ (CombineWithContext.CombineFnWithContext<InputT, ?, OutputT>)
CombineFnUtil.toFnWithContext(transform.getFn());
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index ff43fa6..ce52b90 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -93,10 +93,9 @@ public class SparkRunnerDebuggerTest {
final String expectedPipeline = "sparkContext.parallelize(Arrays.asList(...))\n"
+ "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"
+ "_.mapPartitions(new org.apache.beam.sdk.transforms.Count$PerElement$1())\n"
- + "_.combineByKey(..., new org.apache.beam.sdk.transforms"
- + ".Combine$CombineFn$KeyIgnoringCombineFn(), ...)\n"
+ + "_.combineByKey(..., new org.apache.beam.sdk.transforms.Count$CountFn(), ...)\n"
+ "_.groupByKey()\n"
- + "_.map(new org.apache.beam.sdk.transforms.Combine$CombineFn$KeyIgnoringCombineFn())\n"
+ + "_.map(new org.apache.beam.sdk.transforms.Sum$SumLongFn())\n"
+ "_.mapPartitions(new org.apache.beam.runners.spark"
+ ".SparkRunnerDebuggerTest$PlusOne())\n"
+ "sparkContext.union(...)\n"
@@ -145,7 +144,7 @@ public class SparkRunnerDebuggerTest {
+ "SparkRunnerDebuggerTest$FormatKVFn())\n"
+ "_.mapPartitions(new org.apache.beam.sdk.transforms.Distinct$2())\n"
+ "_.groupByKey()\n"
- + "_.map(new org.apache.beam.sdk.transforms.Combine$CombineFn$KeyIgnoringCombineFn())\n"
+ + "_.map(new org.apache.beam.sdk.transforms.Combine$IterableCombineFn())\n"
+ "_.mapPartitions(new org.apache.beam.sdk.transforms.Keys$1())\n"
+ "_.mapPartitions(new org.apache.beam.sdk.transforms.WithKeys$2())\n"
+ "_.<org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Write>";
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index cf7d668..d03fbf3 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -361,7 +361,7 @@
<!--[BEAM-413] Test for floating point equality-->
</Match>
<Match>
- <Class name="org.apache.beam.sdk.util.CombineFnUtil$NonSerializableBoundedKeyedCombineFn"/>
+ <Class name="org.apache.beam.sdk.util.CombineFnUtil$NonSerializableBoundedCombineFn"/>
<Field name="context"/>
<Bug pattern="SE_BAD_FIELD"/>
<!--[BEAM-419] Non-transient non-serializable instance field in serializable class-->
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
index ed3a253..5432f09 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
@@ -155,9 +155,7 @@ public class ApproximateQuantiles {
public static <K, V, ComparatorT extends Comparator<V> & Serializable>
PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>>
perKey(int numQuantiles, ComparatorT compareFn) {
- return Combine.perKey(
- ApproximateQuantilesCombineFn.create(numQuantiles, compareFn)
- .<K>asKeyedFn());
+ return Combine.perKey(ApproximateQuantilesCombineFn.create(numQuantiles, compareFn));
}
/**
@@ -173,9 +171,7 @@ public class ApproximateQuantiles {
public static <K, V extends Comparable<V>>
PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>>
perKey(int numQuantiles) {
- return Combine.perKey(
- ApproximateQuantilesCombineFn.<V>create(numQuantiles)
- .<K>asKeyedFn());
+ return Combine.perKey(ApproximateQuantilesCombineFn.<V>create(numQuantiles));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
index 33820e0..5d38206 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
@@ -281,8 +281,7 @@ public class ApproximateUnique {
final Coder<V> coder = ((KvCoder<K, V>) inputCoder).getValueCoder();
return input.apply(
- Combine.perKey(new ApproximateUniqueCombineFn<>(
- sampleSize, coder).<K>asKeyedFn()));
+ Combine.<K, V, Long>perKey(new ApproximateUniqueCombineFn<>(sampleSize, coder)));
}
@Override
[2/6] beam git commit: Remove KeyedCombineFn
Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java
index 30b302c..b16aadc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java
@@ -23,13 +23,12 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
import org.apache.beam.sdk.values.PCollectionView;
/**
- * A {@link KeyedCombineFnWithContext} with a fixed accumulator coder. This is created from a
- * specific application of the {@link KeyedCombineFnWithContext}.
+ * A {@link GlobalCombineFn} with a fixed accumulator coder. This is created from a
+ * specific application of the {@link GlobalCombineFn}.
*
* <p>Because the {@code AccumT} may reference {@code InputT}, the specific {@code Coder<AccumT>}
* may depend on the {@code Coder<InputT>}.
@@ -41,14 +40,14 @@ import org.apache.beam.sdk.values.PCollectionView;
*/
public class AppliedCombineFn<K, InputT, AccumT, OutputT> implements Serializable {
- private final PerKeyCombineFn<K, InputT, AccumT, OutputT> fn;
+ private final GlobalCombineFn<InputT, AccumT, OutputT> fn;
private final Coder<AccumT> accumulatorCoder;
private final Iterable<PCollectionView<?>> sideInputViews;
private final KvCoder<K, InputT> kvCoder;
private final WindowingStrategy<?, ?> windowingStrategy;
- private AppliedCombineFn(PerKeyCombineFn<K, InputT, AccumT, OutputT> fn,
+ private AppliedCombineFn(GlobalCombineFn<InputT, AccumT, OutputT> fn,
Coder<AccumT> accumulatorCoder, Iterable<PCollectionView<?>> sideInputViews,
KvCoder<K, InputT> kvCoder, WindowingStrategy<?, ?> windowingStrategy) {
this.fn = fn;
@@ -60,41 +59,41 @@ public class AppliedCombineFn<K, InputT, AccumT, OutputT> implements Serializabl
public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT>
withAccumulatorCoder(
- PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn,
+ GlobalCombineFn<? super InputT, AccumT, OutputT> fn,
Coder<AccumT> accumCoder) {
return withAccumulatorCoder(fn, accumCoder, null, null, null);
}
public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT>
withAccumulatorCoder(
- PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn,
+ GlobalCombineFn<? super InputT, AccumT, OutputT> fn,
Coder<AccumT> accumCoder, Iterable<PCollectionView<?>> sideInputViews,
KvCoder<K, InputT> kvCoder, WindowingStrategy<?, ?> windowingStrategy) {
// Casting down the K and InputT is safe because they're only used as inputs.
@SuppressWarnings("unchecked")
- PerKeyCombineFn<K, InputT, AccumT, OutputT> clonedFn =
- (PerKeyCombineFn<K, InputT, AccumT, OutputT>) SerializableUtils.clone(fn);
+ GlobalCombineFn<InputT, AccumT, OutputT> clonedFn =
+ (GlobalCombineFn<InputT, AccumT, OutputT>) SerializableUtils.clone(fn);
return create(clonedFn, accumCoder, sideInputViews, kvCoder, windowingStrategy);
}
@VisibleForTesting
public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT>
- withInputCoder(PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn,
+ withInputCoder(GlobalCombineFn<? super InputT, AccumT, OutputT> fn,
CoderRegistry registry, KvCoder<K, InputT> kvCoder) {
return withInputCoder(fn, registry, kvCoder, null, null);
}
public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT>
- withInputCoder(PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn,
+ withInputCoder(GlobalCombineFn<? super InputT, AccumT, OutputT> fn,
CoderRegistry registry, KvCoder<K, InputT> kvCoder,
Iterable<PCollectionView<?>> sideInputViews, WindowingStrategy<?, ?> windowingStrategy) {
// Casting down the K and InputT is safe because they're only used as inputs.
@SuppressWarnings("unchecked")
- PerKeyCombineFn<K, InputT, AccumT, OutputT> clonedFn =
- (PerKeyCombineFn<K, InputT, AccumT, OutputT>) SerializableUtils.clone(fn);
+ GlobalCombineFn<InputT, AccumT, OutputT> clonedFn =
+ (GlobalCombineFn<InputT, AccumT, OutputT>) SerializableUtils.clone(fn);
try {
- Coder<AccumT> accumulatorCoder = clonedFn.getAccumulatorCoder(
- registry, kvCoder.getKeyCoder(), kvCoder.getValueCoder());
+ Coder<AccumT> accumulatorCoder =
+ clonedFn.getAccumulatorCoder(registry, kvCoder.getValueCoder());
return create(clonedFn, accumulatorCoder, sideInputViews, kvCoder, windowingStrategy);
} catch (CannotProvideCoderException e) {
throw new IllegalStateException("Could not determine coder for accumulator", e);
@@ -102,14 +101,14 @@ public class AppliedCombineFn<K, InputT, AccumT, OutputT> implements Serializabl
}
private static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT> create(
- PerKeyCombineFn<K, InputT, AccumT, OutputT> fn,
+ GlobalCombineFn<InputT, AccumT, OutputT> fn,
Coder<AccumT> accumulatorCoder, Iterable<PCollectionView<?>> sideInputViews,
KvCoder<K, InputT> kvCoder, WindowingStrategy<?, ?> windowingStrategy) {
return new AppliedCombineFn<>(
fn, accumulatorCoder, sideInputViews, kvCoder, windowingStrategy);
}
- public PerKeyCombineFn<K, InputT, AccumT, OutputT> getFn() {
+ public GlobalCombineFn<InputT, AccumT, OutputT> getFn() {
return fn;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
index a9a0178..a394180 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
@@ -24,12 +24,9 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.state.StateContext;
@@ -37,20 +34,21 @@ import org.apache.beam.sdk.util.state.StateContext;
* Static utility methods that create combine function instances.
*/
public class CombineFnUtil {
+
/**
- * Returns the partial application of the {@link KeyedCombineFnWithContext} to a specific
- * context to produce a {@link KeyedCombineFn}.
+ * Returns the partial application of the {@link CombineFnWithContext} to a specific context
+ * to produce a {@link CombineFn}.
*
- * <p>The returned {@link KeyedCombineFn} cannot be serialized.
+ * <p>The returned {@link CombineFn} cannot be serialized.
*/
- public static <K, InputT, AccumT, OutputT> KeyedCombineFn<K, InputT, AccumT, OutputT>
- bindContext(
- KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn,
+ public static <K, InputT, AccumT, OutputT> CombineFn<InputT, AccumT, OutputT> bindContext(
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
StateContext<?> stateContext) {
Context context = CombineContextFactory.createFromStateContext(stateContext);
- return new NonSerializableBoundedKeyedCombineFn<>(combineFn, context);
+ return new NonSerializableBoundedCombineFn<>(combineFn, context);
}
+
/**
* Return a {@link CombineFnWithContext} from the given {@link GlobalCombineFn}.
*/
@@ -110,100 +108,55 @@ public class CombineFnUtil {
}
}
- /**
- * Return a {@link KeyedCombineFnWithContext} from the given {@link PerKeyCombineFn}.
- */
- public static <K, InputT, AccumT, OutputT> KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>
- toFnWithContext(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
- if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
- @SuppressWarnings("unchecked")
- KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext =
- (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn;
- return keyedCombineFnWithContext;
- } else {
- @SuppressWarnings("unchecked")
- final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn =
- (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn;
- return new KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
- @Override
- public AccumT createAccumulator(K key, Context c) {
- return keyedCombineFn.createAccumulator(key);
- }
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT value, Context c) {
- return keyedCombineFn.addInput(key, accumulator, value);
- }
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c) {
- return keyedCombineFn.mergeAccumulators(key, accumulators);
- }
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, Context c) {
- return keyedCombineFn.extractOutput(key, accumulator);
- }
- @Override
- public AccumT compact(K key, AccumT accumulator, Context c) {
- return keyedCombineFn.compact(key, accumulator);
- }
- @Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return keyedCombineFn.getAccumulatorCoder(registry, keyCoder, inputCoder);
- }
- @Override
- public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return keyedCombineFn.getDefaultOutputCoder(registry, keyCoder, inputCoder);
- }
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- keyedCombineFn.populateDisplayData(builder);
- }
- };
- }
- }
-
- private static class NonSerializableBoundedKeyedCombineFn<K, InputT, AccumT, OutputT>
- extends KeyedCombineFn<K, InputT, AccumT, OutputT> {
- private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
+ private static class NonSerializableBoundedCombineFn<InputT, AccumT, OutputT>
+ extends CombineFn<InputT, AccumT, OutputT> {
+ private final CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
private final Context context;
- private NonSerializableBoundedKeyedCombineFn(
- KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn,
- Context context) {
+ private NonSerializableBoundedCombineFn(
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn, Context context) {
this.combineFn = combineFn;
this.context = context;
}
+
@Override
- public AccumT createAccumulator(K key) {
- return combineFn.createAccumulator(key, context);
+ public AccumT createAccumulator() {
+ return combineFn.createAccumulator(context);
}
+
@Override
- public AccumT addInput(K key, AccumT accumulator, InputT value) {
- return combineFn.addInput(key, accumulator, value, context);
+ public AccumT addInput(AccumT accumulator, InputT value) {
+ return combineFn.addInput(accumulator, value, context);
}
+
@Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(key, accumulators, context);
+ public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+ return combineFn.mergeAccumulators(accumulators, context);
}
+
@Override
- public OutputT extractOutput(K key, AccumT accumulator) {
- return combineFn.extractOutput(key, accumulator, context);
+ public OutputT extractOutput(AccumT accumulator) {
+ return combineFn.extractOutput(accumulator, context);
}
+
@Override
- public AccumT compact(K key, AccumT accumulator) {
- return combineFn.compact(key, accumulator, context);
+ public AccumT compact(AccumT accumulator) {
+ return combineFn.compact(accumulator, context);
}
+
@Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return combineFn.getAccumulatorCoder(registry, keyCoder, inputCoder);
+ public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
+ throws CannotProvideCoderException {
+ return combineFn.getAccumulatorCoder(registry, inputCoder);
}
+
@Override
- public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return combineFn.getDefaultOutputCoder(registry, keyCoder, inputCoder);
+ public Coder<OutputT> getDefaultOutputCoder(
+ CoderRegistry registry, Coder<InputT> inputCoder)
+ throws CannotProvideCoderException {
+ return combineFn.getDefaultOutputCoder(registry, inputCoder);
}
+
@Override
public void populateDisplayData(DisplayData.Builder builder) {
combineFn.populateDisplayData(builder);
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
index f9ab115..6fe37a1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -45,20 +45,11 @@ public interface StateBinder<K> {
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn);
- <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombining(
- String id,
- StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
- Coder<AccumT> accumCoder,
- Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
-
- <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningWithContext(
- String id,
- StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
- Coder<AccumT> accumCoder,
- CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT>
- combineFn);
+ <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(
+ String id,
+ StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+ Coder<AccumT> accumCoder,
+ CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn);
/**
* Bind to a watermark {@link StateSpec}.
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index 8fa5bb0..a057a0b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -27,8 +27,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
@@ -77,52 +76,16 @@ public class StateSpecs {
}
/**
- * Create a state spec for values that use a {@link KeyedCombineFn} to automatically merge
+ * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge
* multiple {@code InputT}s into a single {@code OutputT}.
*/
- public static <K, InputT, AccumT, OutputT>
- StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombining(
- KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- return new KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
- }
-
- /**
- * Create a state spec for values that use a {@link KeyedCombineFn} to automatically merge
- * multiple {@code InputT}s into a single {@code OutputT}.
- */
- public static <K, InputT, AccumT, OutputT>
- StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombining(
- Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
+ public static <InputT, AccumT, OutputT>
+ StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
+ Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
checkArgument(accumCoder != null,
"accumCoder should not be null. "
- + "Consider using keyedCombining(KeyedCombineFn<> combineFn) instead.");
- return keyedCombiningInternal(accumCoder, combineFn);
- }
-
- /**
- * Create a state spec for values that use a {@link KeyedCombineFnWithContext} to automatically
- * merge multiple {@code InputT}s into a single {@code OutputT}.
- */
- public static <K, InputT, AccumT, OutputT>
- StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
- keyedCombiningWithContext(KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
- return new KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
- }
-
- /**
- * Create a state spec for values that use a {@link KeyedCombineFnWithContext} to automatically
- * merge multiple {@code InputT}s into a single {@code OutputT}.
- */
- public static <K, InputT, AccumT, OutputT>
- StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
- keyedCombiningWithContext(
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
- checkArgument(accumCoder != null,
- "accumCoder should not be null. Consider using "
- + "keyedCombiningWithContext(KeyedCombineFnWithContext<> combineFn) instead.");
- return new KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>(
- accumCoder, combineFn);
+ + "Consider using combining(CombineFn<> combineFn) instead.");
+ return combiningInternal(accumCoder, combineFn);
}
/**
@@ -155,10 +118,10 @@ public class StateSpecs {
return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
}
- private static <K, InputT, AccumT, OutputT>
- StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningInternal(
- Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- return new KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn);
+ private static <InputT, AccumT, OutputT>
+ StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningInternal(
+ Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
}
/**
@@ -216,17 +179,17 @@ public class StateSpecs {
public static <K, InputT, AccumT, OutputT>
StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal(
StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
- if (combiningSpec instanceof KeyedCombiningStateSpec) {
+ if (combiningSpec instanceof CombiningStateSpec) {
// Checked above; conversion to a bag spec depends on the provided spec being one of those
// created via the factory methods in this class.
@SuppressWarnings("unchecked")
- KeyedCombiningStateSpec<K, InputT, AccumT, OutputT> typedSpec =
- (KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
+ CombiningStateSpec<InputT, AccumT, OutputT> typedSpec =
+ (CombiningStateSpec<InputT, AccumT, OutputT>) combiningSpec;
return typedSpec.asBagSpec();
- } else if (combiningSpec instanceof KeyedCombiningWithContextStateSpec) {
+ } else if (combiningSpec instanceof CombiningWithContextStateSpec) {
@SuppressWarnings("unchecked")
- KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT> typedSpec =
- (KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
+ CombiningWithContextStateSpec<InputT, AccumT, OutputT> typedSpec =
+ (CombiningWithContextStateSpec<InputT, AccumT, OutputT>) combiningSpec;
return typedSpec.asBagSpec();
} else {
throw new IllegalArgumentException("Unexpected StateSpec " + combiningSpec);
@@ -297,7 +260,6 @@ public class StateSpecs {
* <p>Includes the {@link CombineFn} and the coder for the accumulator type.
*/
private static class CombiningStateSpec<InputT, AccumT, OutputT>
- extends KeyedCombiningStateSpec<Object, InputT, AccumT, OutputT>
implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
@Nullable
@@ -307,14 +269,14 @@ public class StateSpecs {
private CombiningStateSpec(
@Nullable Coder<AccumT> accumCoder,
CombineFn<InputT, AccumT, OutputT> combineFn) {
- super(accumCoder, combineFn.asKeyedFn());
this.combineFn = combineFn;
this.accumCoder = accumCoder;
}
@Override
- protected Coder<AccumT> getAccumCoder() {
- return accumCoder;
+ public CombiningState<InputT, AccumT, OutputT> bind(
+ String id, StateBinder<? extends Object> visitor) {
+ return visitor.bindCombining(id, this, accumCoder, combineFn);
}
@SuppressWarnings("unchecked")
@@ -326,51 +288,14 @@ public class StateSpecs {
}
}
}
- }
-
- /**
- * A specification for a state cell that is combined according to a
- * {@link KeyedCombineFnWithContext}.
- *
- * <p>Includes the {@link KeyedCombineFnWithContext} and the coder for the accumulator type.
- */
- private static class KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>
- implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
-
- @Nullable
- private Coder<AccumT> accumCoder;
- private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
-
- protected KeyedCombiningWithContextStateSpec(
- @Nullable Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
- this.combineFn = combineFn;
- this.accumCoder = accumCoder;
- }
-
- @Override
- public CombiningState<InputT, AccumT, OutputT> bind(
- String id, StateBinder<? extends K> visitor) {
- return visitor.bindKeyedCombiningWithContext(id, this, accumCoder, combineFn);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void offerCoders(Coder[] coders) {
- if (this.accumCoder == null) {
- if (coders[2] != null) {
- this.accumCoder = (Coder<AccumT>) coders[2];
- }
- }
- }
@Override public void finishSpecifying() {
if (accumCoder == null) {
throw new IllegalStateException("Unable to infer a coder for"
- + " KeyedCombiningWithContextState and no Coder was specified."
+ + " CombiningState and no Coder was specified."
+ " Please set a coder by either invoking"
- + " StateSpecs.keyedCombining(Coder<AccumT> accumCoder,"
- + " KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn)"
+ + " StateSpecs.combining(Coder<AccumT> accumCoder,"
+ + " CombineFn<InputT, AccumT, OutputT> combineFn)"
+ " or by registering the coder in the Pipeline's CoderRegistry.");
}
}
@@ -381,12 +306,12 @@ public class StateSpecs {
return true;
}
- if (!(obj instanceof KeyedCombiningWithContextStateSpec)) {
+ if (!(obj instanceof CombiningStateSpec)) {
return false;
}
- KeyedCombiningWithContextStateSpec<?, ?, ?, ?> that =
- (KeyedCombiningWithContextStateSpec<?, ?, ?, ?>) obj;
+ CombiningStateSpec<?, ?, ?> that =
+ (CombiningStateSpec<?, ?, ?>) obj;
return Objects.equals(this.accumCoder, that.accumCoder);
}
@@ -401,32 +326,28 @@ public class StateSpecs {
}
/**
- * A specification for a state cell that is combined according to a {@link KeyedCombineFn}.
+ * A specification for a state cell that is combined according to a {@link
+ * CombineFnWithContext}.
*
- * <p>Includes the {@link KeyedCombineFn} and the coder for the accumulator type.
+ * <p>Includes the {@link CombineFnWithContext} and the coder for the accumulator type.
*/
- private static class KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>
- implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
+ private static class CombiningWithContextStateSpec<InputT, AccumT, OutputT>
+ implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
- @Nullable
- private Coder<AccumT> accumCoder;
- private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
+ @Nullable private Coder<AccumT> accumCoder;
+ private final CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
- protected KeyedCombiningStateSpec(
+ private CombiningWithContextStateSpec(
@Nullable Coder<AccumT> accumCoder,
- KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
- this.keyedCombineFn = keyedCombineFn;
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ this.combineFn = combineFn;
this.accumCoder = accumCoder;
}
- protected Coder<AccumT> getAccumCoder() {
- return accumCoder;
- }
-
@Override
public CombiningState<InputT, AccumT, OutputT> bind(
- String id, StateBinder<? extends K> visitor) {
- return visitor.bindKeyedCombining(id, this, getAccumCoder(), keyedCombineFn);
+ String id, StateBinder<? extends Object> visitor) {
+ return visitor.bindCombiningWithContext(id, this, accumCoder, combineFn);
}
@SuppressWarnings("unchecked")
@@ -439,13 +360,16 @@ public class StateSpecs {
}
}
- @Override public void finishSpecifying() {
- if (getAccumCoder() == null) {
- throw new IllegalStateException("Unable to infer a coder for GroupingState and no"
- + " Coder was specified. Please set a coder by either invoking"
- + " StateSpecs.combining(Coder<AccumT> accumCoder,"
- + " CombineFn<InputT, AccumT, OutputT> combineFn)"
- + " or by registering the coder in the Pipeline's CoderRegistry.");
+ @Override
+ public void finishSpecifying() {
+ if (accumCoder == null) {
+ throw new IllegalStateException(
+ "Unable to infer a coder for"
+ + " CombiningWithContextState and no Coder was specified."
+ + " Please set a coder by either invoking"
+ + " StateSpecs.combiningWithcontext(Coder<AccumT> accumCoder,"
+ + " CombineFnWithContext<InputT, AccumT, OutputT> combineFn)"
+ + " or by registering the coder in the Pipeline's CoderRegistry.");
}
}
@@ -455,12 +379,11 @@ public class StateSpecs {
return true;
}
- if (!(obj instanceof CombiningStateSpec)) {
+ if (!(obj instanceof CombiningWithContextStateSpec)) {
return false;
}
- KeyedCombiningStateSpec<?, ?, ?, ?> that =
- (KeyedCombiningStateSpec<?, ?, ?, ?>) obj;
+ CombiningWithContextStateSpec<?, ?, ?> that = (CombiningWithContextStateSpec<?, ?, ?>) obj;
return Objects.equals(this.accumCoder, that.accumCoder);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index 13c5f16..dcb8fdc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -40,7 +40,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
import org.apache.beam.sdk.transforms.CombineFns.CoCombineResult;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -79,7 +79,7 @@ public class CombineFnsTest {
expectedException.expectMessage("it is already present in the composition");
TupleTag<Integer> tag = new TupleTag<Integer>();
- CombineFns.composeKeyed()
+ CombineFns.compose()
.with(new GetIntegerFunction(), Max.ofIntegers(), tag)
.with(new GetIntegerFunction(), Min.ofIntegers(), tag);
}
@@ -93,23 +93,6 @@ public class CombineFnsTest {
CombineFns.compose()
.with(
new GetUserStringFunction(),
- new ConcatStringWithContext(null /* view */).forKey("G", StringUtf8Coder.of()),
- tag)
- .with(
- new GetUserStringFunction(),
- new ConcatStringWithContext(null /* view */).forKey("G", StringUtf8Coder.of()),
- tag);
- }
-
- @Test
- public void testDuplicatedTagsWithContextKeyed() {
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage("it is already present in the composition");
-
- TupleTag<UserString> tag = new TupleTag<UserString>();
- CombineFns.composeKeyed()
- .with(
- new GetUserStringFunction(),
new ConcatStringWithContext(null /* view */),
tag)
.with(
@@ -153,17 +136,15 @@ public class CombineFnsTest {
.apply(
"ExtractGloballyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag)));
- PCollection<KV<String, KV<Integer, String>>> combinePerKey = perKeyInput
- .apply(Combine.perKey(CombineFns.composeKeyed()
- .with(
- new GetIntegerFunction(),
- Max.ofIntegers().<String>asKeyedFn(),
- maxIntTag)
- .with(
- new GetUserStringFunction(),
- new ConcatString().<String>asKeyedFn(),
- concatStringTag)))
- .apply("ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag)));
+ PCollection<KV<String, KV<Integer, String>>> combinePerKey =
+ perKeyInput
+ .apply(
+ Combine.<String, KV<Integer, UserString>, CoCombineResult>perKey(
+ CombineFns.compose()
+ .with(new GetIntegerFunction(), Max.ofIntegers(), maxIntTag)
+ .with(new GetUserStringFunction(), new ConcatString(), concatStringTag)))
+ .apply(
+ "ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag)));
PAssert.that(combineGlobally).containsInAnyOrder(
KV.of("global", KV.of(13, "111134")));
PAssert.that(combinePerKey).containsInAnyOrder(
@@ -205,7 +186,7 @@ public class CombineFnsTest {
maxIntTag)
.with(
new GetUserStringFunction(),
- new ConcatStringWithContext(view).forKey("G", StringUtf8Coder.of()),
+ new ConcatStringWithContext(view),
concatStringTag))
.withoutDefaults()
.withSideInputs(ImmutableList.of(view)))
@@ -213,23 +194,24 @@ public class CombineFnsTest {
.apply(
"ExtractGloballyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag)));
- PCollection<KV<String, KV<Integer, String>>> combinePerKey = perKeyInput
- .apply(Combine.perKey(CombineFns.composeKeyed()
- .with(
- new GetIntegerFunction(),
- Max.ofIntegers().<String>asKeyedFn(),
- maxIntTag)
- .with(
- new GetUserStringFunction(),
- new ConcatStringWithContext(view),
- concatStringTag))
- .withSideInputs(ImmutableList.of(view)))
- .apply("ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag)));
+ PCollection<KV<String, KV<Integer, String>>> combinePerKey =
+ perKeyInput
+ .apply(
+ Combine.<String, KV<Integer, UserString>, CoCombineResult>perKey(
+ CombineFns.compose()
+ .with(new GetIntegerFunction(), Max.ofIntegers(), maxIntTag)
+ .with(
+ new GetUserStringFunction(),
+ new ConcatStringWithContext(view),
+ concatStringTag))
+ .withSideInputs(ImmutableList.of(view)))
+ .apply(
+ "ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag)));
PAssert.that(combineGlobally).containsInAnyOrder(
- KV.of("global", KV.of(13, "111134GI")));
+ KV.of("global", KV.of(13, "111134I")));
PAssert.that(combinePerKey).containsInAnyOrder(
- KV.of("a", KV.of(4, "114Ia")),
- KV.of("b", KV.of(13, "113Ib")));
+ KV.of("a", KV.of(4, "114I")),
+ KV.of("b", KV.of(13, "113I")));
p.run();
}
@@ -256,17 +238,16 @@ public class CombineFnsTest {
TupleTag<Integer> maxIntTag = new TupleTag<Integer>();
TupleTag<UserString> concatStringTag = new TupleTag<UserString>();
- PCollection<KV<String, KV<Integer, String>>> combinePerKey = perKeyInput
- .apply(Combine.perKey(CombineFns.composeKeyed()
- .with(
- new GetIntegerFunction(),
- Max.ofIntegers().<String>asKeyedFn(),
- maxIntTag)
- .with(
- new GetUserStringFunction(),
- new OutputNullString().<String>asKeyedFn(),
- concatStringTag)))
- .apply("ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag)));
+ PCollection<KV<String, KV<Integer, String>>> combinePerKey =
+ perKeyInput
+ .apply(
+ Combine.<String, KV<Integer, UserString>, CoCombineResult>perKey(
+ CombineFns.compose()
+ .with(new GetIntegerFunction(), Max.ofIntegers(), maxIntTag)
+ .with(
+ new GetUserStringFunction(), new OutputNullString(), concatStringTag)))
+ .apply(
+ "ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag)));
PAssert.that(combinePerKey).containsInAnyOrder(
KV.of("a", KV.of(4, (String) null)),
KV.of("b", KV.of(13, (String) null)));
@@ -407,7 +388,7 @@ public class CombineFnsTest {
}
private static class ConcatStringWithContext
- extends KeyedCombineFnWithContext<String, UserString, UserString, UserString> {
+ extends CombineFnWithContext<UserString, UserString, UserString> {
private final PCollectionView<String> view;
private ConcatStringWithContext(PCollectionView<String> view) {
@@ -415,22 +396,22 @@ public class CombineFnsTest {
}
@Override
- public UserString createAccumulator(String key, CombineWithContext.Context c) {
- return UserString.of(key + c.sideInput(view));
+ public UserString createAccumulator(CombineWithContext.Context c) {
+ return UserString.of(c.sideInput(view));
}
@Override
public UserString addInput(
- String key, UserString accumulator, UserString input, CombineWithContext.Context c) {
- assertThat(accumulator.strValue, Matchers.startsWith(key + c.sideInput(view)));
+ UserString accumulator, UserString input, CombineWithContext.Context c) {
+ assertThat(accumulator.strValue, Matchers.startsWith(c.sideInput(view)));
accumulator.strValue += input.strValue;
return accumulator;
}
@Override
public UserString mergeAccumulators(
- String key, Iterable<UserString> accumulators, CombineWithContext.Context c) {
- String keyPrefix = key + c.sideInput(view);
+ Iterable<UserString> accumulators, CombineWithContext.Context c) {
+ String keyPrefix = c.sideInput(view);
String all = keyPrefix;
for (UserString accumulator : accumulators) {
assertThat(accumulator.strValue, Matchers.startsWith(keyPrefix));
@@ -441,9 +422,8 @@ public class CombineFnsTest {
}
@Override
- public UserString extractOutput(
- String key, UserString accumulator, CombineWithContext.Context c) {
- assertThat(accumulator.strValue, Matchers.startsWith(key + c.sideInput(view)));
+ public UserString extractOutput(UserString accumulator, CombineWithContext.Context c) {
+ assertThat(accumulator.strValue, Matchers.startsWith(c.sideInput(view)));
char[] chars = accumulator.strValue.toCharArray();
Arrays.sort(chars);
return UserString.of(new String(chars));
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index a5f3df2..82c2504 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.transforms;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.beam.sdk.TestUtils.checkCombineFn;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
@@ -58,9 +57,10 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.CombineTest.TestCombineFn.Accumulator;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
@@ -127,7 +127,7 @@ public class CombineTest implements Serializable {
// Java 8 will infer.
PCollection<KV<String, String>> sumPerKey = input
- .apply(Combine.perKey(new TestKeyedCombineFn()));
+ .apply(Combine.<String, Integer, String>perKey(new TestCombineFn()));
PAssert.that(sum).containsInAnyOrder(globalSum);
PAssert.that(sumPerKey).containsInAnyOrder(perKeyCombines);
@@ -147,13 +147,13 @@ public class CombineTest implements Serializable {
PCollectionView<Integer> globallySumView = sum.apply(View.<Integer>asSingleton());
// Java 8 will infer.
- PCollection<KV<String, String>> combinePerKey = perKeyInput
- .apply(Combine.perKey(new TestKeyedCombineFnWithContext(globallySumView))
- .withSideInputs(Arrays.asList(globallySumView)));
+ PCollection<KV<String, String>> combinePerKey =
+ perKeyInput.apply(
+ Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView))
+ .withSideInputs(Arrays.asList(globallySumView)));
PCollection<String> combineGlobally = globallyInput
- .apply(Combine.globally(new TestKeyedCombineFnWithContext(globallySumView)
- .forKey("G", StringUtf8Coder.of()))
+ .apply(Combine.globally(new TestCombineFnWithContext(globallySumView))
.withoutDefaults()
.withSideInputs(Arrays.asList(globallySumView)));
@@ -168,7 +168,7 @@ public class CombineTest implements Serializable {
@Category(ValidatesRunner.class)
@SuppressWarnings({"rawtypes", "unchecked"})
public void testSimpleCombine() {
- runTestSimpleCombine(TABLE, 20, Arrays.asList(KV.of("a", "114a"), KV.of("b", "113b")));
+ runTestSimpleCombine(TABLE, 20, Arrays.asList(KV.of("a", "114"), KV.of("b", "113")));
}
@Test
@@ -176,8 +176,8 @@ public class CombineTest implements Serializable {
@SuppressWarnings({"rawtypes", "unchecked"})
public void testSimpleCombineWithContext() {
runTestSimpleCombineWithContext(TABLE, 20,
- Arrays.asList(KV.of("a", "01124a"), KV.of("b", "01123b")),
- new String[] {"01111234G"});
+ Arrays.asList(KV.of("a", "01124"), KV.of("b", "01123")),
+ new String[] {"01111234"});
}
@Test
@@ -260,14 +260,14 @@ public class CombineTest implements Serializable {
.apply(Combine.globally(new SumInts()).withoutDefaults());
PCollection<KV<String, String>> sumPerKey = input
- .apply(Combine.perKey(new TestKeyedCombineFn()));
+ .apply(Combine.<String, Integer, String>perKey(new TestCombineFn()));
PAssert.that(sum).containsInAnyOrder(2, 5, 13);
PAssert.that(sumPerKey).containsInAnyOrder(
- KV.of("a", "11a"),
- KV.of("a", "4a"),
- KV.of("b", "1b"),
- KV.of("b", "13b"));
+ KV.of("a", "11"),
+ KV.of("a", "4"),
+ KV.of("b", "1"),
+ KV.of("b", "13"));
pipeline.run();
}
@@ -286,23 +286,23 @@ public class CombineTest implements Serializable {
PCollectionView<Integer> globallySumView = sum.apply(View.<Integer>asSingleton());
- PCollection<KV<String, String>> combinePerKeyWithContext = perKeyInput
- .apply(Combine.perKey(new TestKeyedCombineFnWithContext(globallySumView))
- .withSideInputs(Arrays.asList(globallySumView)));
+ PCollection<KV<String, String>> combinePerKeyWithContext =
+ perKeyInput.apply(
+ Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView))
+ .withSideInputs(Arrays.asList(globallySumView)));
PCollection<String> combineGloballyWithContext = globallyInput
- .apply(Combine.globally(new TestKeyedCombineFnWithContext(globallySumView)
- .forKey("G", StringUtf8Coder.of()))
+ .apply(Combine.globally(new TestCombineFnWithContext(globallySumView))
.withoutDefaults()
.withSideInputs(Arrays.asList(globallySumView)));
PAssert.that(sum).containsInAnyOrder(2, 5, 13);
PAssert.that(combinePerKeyWithContext).containsInAnyOrder(
- KV.of("a", "112a"),
- KV.of("a", "45a"),
- KV.of("b", "15b"),
- KV.of("b", "1133b"));
- PAssert.that(combineGloballyWithContext).containsInAnyOrder("112G", "145G", "1133G");
+ KV.of("a", "112"),
+ KV.of("a", "45"),
+ KV.of("b", "15"),
+ KV.of("b", "1133"));
+ PAssert.that(combineGloballyWithContext).containsInAnyOrder("112", "145", "1133");
pipeline.run();
}
@@ -321,28 +321,28 @@ public class CombineTest implements Serializable {
PCollectionView<Integer> globallySumView = sum.apply(View.<Integer>asSingleton());
- PCollection<KV<String, String>> combinePerKeyWithContext = perKeyInput
- .apply(Combine.perKey(new TestKeyedCombineFnWithContext(globallySumView))
- .withSideInputs(Arrays.asList(globallySumView)));
+ PCollection<KV<String, String>> combinePerKeyWithContext =
+ perKeyInput.apply(
+ Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView))
+ .withSideInputs(Arrays.asList(globallySumView)));
PCollection<String> combineGloballyWithContext = globallyInput
- .apply(Combine.globally(new TestKeyedCombineFnWithContext(globallySumView)
- .forKey("G", StringUtf8Coder.of()))
+ .apply(Combine.globally(new TestCombineFnWithContext(globallySumView))
.withoutDefaults()
.withSideInputs(Arrays.asList(globallySumView)));
PAssert.that(sum).containsInAnyOrder(1, 2, 1, 4, 5, 14, 13);
PAssert.that(combinePerKeyWithContext).containsInAnyOrder(
- KV.of("a", "11a"),
- KV.of("a", "112a"),
- KV.of("a", "11a"),
- KV.of("a", "44a"),
- KV.of("a", "45a"),
- KV.of("b", "15b"),
- KV.of("b", "11134b"),
- KV.of("b", "1133b"));
+ KV.of("a", "11"),
+ KV.of("a", "112"),
+ KV.of("a", "11"),
+ KV.of("a", "44"),
+ KV.of("a", "45"),
+ KV.of("b", "15"),
+ KV.of("b", "11134"),
+ KV.of("b", "1133"));
PAssert.that(combineGloballyWithContext).containsInAnyOrder(
- "11G", "112G", "11G", "44G", "145G", "11134G", "1133G");
+ "11", "112", "11", "44", "145", "11134", "1133");
pipeline.run();
}
@@ -392,13 +392,13 @@ public class CombineTest implements Serializable {
.apply(Combine.globally(new SumInts()).withoutDefaults());
PCollection<KV<String, String>> sumPerKey = input
- .apply(Combine.perKey(new TestKeyedCombineFn()));
+ .apply(Combine.<String, Integer, String>perKey(new TestCombineFn()));
PAssert.that(sum).containsInAnyOrder(7, 13);
PAssert.that(sumPerKey).containsInAnyOrder(
- KV.of("a", "114a"),
- KV.of("b", "1b"),
- KV.of("b", "13b"));
+ KV.of("a", "114"),
+ KV.of("b", "1"),
+ KV.of("b", "13"));
pipeline.run();
}
@@ -419,26 +419,29 @@ public class CombineTest implements Serializable {
PCollectionView<Integer> globallyFixedWindowsView =
fixedWindowsSum.apply(View.<Integer>asSingleton().withDefaultValue(0));
- PCollection<KV<String, String>> sessionsCombinePerKey = perKeyInput
- .apply("PerKey Input Sessions",
- Window.<KV<String, Integer>>into(Sessions.withGapDuration(Duration.millis(5))))
- .apply(Combine.perKey(new TestKeyedCombineFnWithContext(globallyFixedWindowsView))
- .withSideInputs(Arrays.asList(globallyFixedWindowsView)));
+ PCollection<KV<String, String>> sessionsCombinePerKey =
+ perKeyInput
+ .apply(
+ "PerKey Input Sessions",
+ Window.<KV<String, Integer>>into(Sessions.withGapDuration(Duration.millis(5))))
+ .apply(
+ Combine.<String, Integer, String>perKey(
+ new TestCombineFnWithContext(globallyFixedWindowsView))
+ .withSideInputs(Arrays.asList(globallyFixedWindowsView)));
PCollection<String> sessionsCombineGlobally = globallyInput
.apply("Globally Input Sessions",
Window.<Integer>into(Sessions.withGapDuration(Duration.millis(5))))
- .apply(Combine.globally(new TestKeyedCombineFnWithContext(globallyFixedWindowsView)
- .forKey("G", StringUtf8Coder.of()))
+ .apply(Combine.globally(new TestCombineFnWithContext(globallyFixedWindowsView))
.withoutDefaults()
.withSideInputs(Arrays.asList(globallyFixedWindowsView)));
PAssert.that(fixedWindowsSum).containsInAnyOrder(2, 4, 1, 13);
PAssert.that(sessionsCombinePerKey).containsInAnyOrder(
- KV.of("a", "1114a"),
- KV.of("b", "11b"),
- KV.of("b", "013b"));
- PAssert.that(sessionsCombineGlobally).containsInAnyOrder("11114G", "013G");
+ KV.of("a", "1114"),
+ KV.of("b", "11"),
+ KV.of("b", "013"));
+ PAssert.that(sessionsCombineGlobally).containsInAnyOrder("11114", "013");
pipeline.run();
}
@@ -502,16 +505,15 @@ public class CombineTest implements Serializable {
public void testHotKeyCombining() {
PCollection<KV<String, Integer>> input = copy(createInput(pipeline, TABLE), 10);
- KeyedCombineFn<String, Integer, ?, Double> mean =
- new MeanInts().<String>asKeyedFn();
+ CombineFn<Integer, ?, Double> mean = new MeanInts();
PCollection<KV<String, Double>> coldMean = input.apply("ColdMean",
- Combine.perKey(mean).withHotKeyFanout(0));
+ Combine.<String, Integer, Double>perKey(mean).withHotKeyFanout(0));
PCollection<KV<String, Double>> warmMean = input.apply("WarmMean",
- Combine.perKey(mean).withHotKeyFanout(hotKeyFanout));
+ Combine.<String, Integer, Double>perKey(mean).withHotKeyFanout(hotKeyFanout));
PCollection<KV<String, Double>> hotMean = input.apply("HotMean",
- Combine.perKey(mean).withHotKeyFanout(5));
+ Combine.<String, Integer, Double>perKey(mean).withHotKeyFanout(5));
PCollection<KV<String, Double>> splitMean = input.apply("SplitMean",
- Combine.perKey(mean).withHotKeyFanout(splitHotKeyFanout));
+ Combine.<String, Integer, Double>perKey(mean).withHotKeyFanout(splitHotKeyFanout));
List<KV<String, Double>> expected = Arrays.asList(KV.of("a", 2.0), KV.of("b", 7.0));
PAssert.that(coldMean).containsInAnyOrder(expected);
@@ -678,10 +680,10 @@ public class CombineTest implements Serializable {
assertEquals(
"Combine.GloballyAsSingletonView",
Combine.globally(new SumInts()).asSingletonView().getName());
- assertEquals("Combine.perKey(TestKeyed)", Combine.perKey(new TestKeyedCombineFn()).getName());
+ assertEquals("Combine.perKey(Test)", Combine.perKey(new TestCombineFn()).getName());
assertEquals(
- "Combine.perKeyWithFanout(TestKeyed)",
- Combine.perKey(new TestKeyedCombineFn()).withHotKeyFanout(10).getName());
+ "Combine.perKeyWithFanout(Test)",
+ Combine.perKey(new TestCombineFn()).withHotKeyFanout(10).getName());
}
@Test
@@ -908,14 +910,10 @@ public class CombineTest implements Serializable {
}
/**
- * A KeyedCombineFn that exercises the full generality of [Keyed]CombineFn.
- *
- * <p>The net result of applying this CombineFn is a sorted list of all
- * characters occurring in the key and the decimal representations of
- * each value.
+ * A {@link CombineFn} that results in a sorted list of all characters occurring in the key and
+ * the decimal representations of each value.
*/
- public static class TestKeyedCombineFn
- extends KeyedCombineFn<String, Integer, TestKeyedCombineFn.Accumulator, String> {
+ public static class TestCombineFn extends CombineFn<Integer, TestCombineFn.Accumulator, String> {
// Not serializable.
static class Accumulator {
@@ -943,20 +941,18 @@ public class CombineTest implements Serializable {
@Override
public Coder<Accumulator> getAccumulatorCoder(
- CoderRegistry registry, Coder<String> keyCoder, Coder<Integer> inputCoder) {
+ CoderRegistry registry, Coder<Integer> inputCoder) {
return Accumulator.getCoder();
}
@Override
- public Accumulator createAccumulator(String key) {
- return new Accumulator(key);
+ public Accumulator createAccumulator() {
+ return new Accumulator("");
}
@Override
- public Accumulator addInput(String key, Accumulator accumulator, Integer value) {
- checkNotNull(key);
+ public Accumulator addInput(Accumulator accumulator, Integer value) {
try {
- assertThat(accumulator.value, Matchers.startsWith(key));
return new Accumulator(accumulator.value + String.valueOf(value));
} finally {
accumulator.value = "cleared in addInput";
@@ -964,19 +960,17 @@ public class CombineTest implements Serializable {
}
@Override
- public Accumulator mergeAccumulators(String key, Iterable<Accumulator> accumulators) {
- String all = key;
+ public Accumulator mergeAccumulators(Iterable<Accumulator> accumulators) {
+ String all = "";
for (Accumulator accumulator : accumulators) {
- assertThat(accumulator.value, Matchers.startsWith(key));
- all += accumulator.value.substring(key.length());
+ all += accumulator.value;
accumulator.value = "cleared in mergeAccumulators";
}
return new Accumulator(all);
}
@Override
- public String extractOutput(String key, Accumulator accumulator) {
- assertThat(accumulator.value, Matchers.startsWith(key));
+ public String extractOutput(Accumulator accumulator) {
char[] chars = accumulator.value.toCharArray();
Arrays.sort(chars);
return new String(chars);
@@ -984,38 +978,33 @@ public class CombineTest implements Serializable {
}
/**
- * A {@link KeyedCombineFnWithContext} that exercises the full generality
- * of [Keyed]CombineFnWithContext.
- *
- * <p>The net result of applying this CombineFn is a sorted list of all
- * characters occurring in the key and the decimal representations of
- * main and side inputs values.
+ * A {@link CombineFnWithContext} that produces a sorted list of all characters occurring in the
+ * key and the decimal representations of main and side inputs values.
*/
- public class TestKeyedCombineFnWithContext
- extends KeyedCombineFnWithContext<String, Integer, TestKeyedCombineFn.Accumulator, String> {
+ public class TestCombineFnWithContext extends CombineFnWithContext<Integer, Accumulator, String> {
private final PCollectionView<Integer> view;
- public TestKeyedCombineFnWithContext(PCollectionView<Integer> view) {
+ public TestCombineFnWithContext(PCollectionView<Integer> view) {
this.view = view;
}
@Override
- public Coder<TestKeyedCombineFn.Accumulator> getAccumulatorCoder(
- CoderRegistry registry, Coder<String> keyCoder, Coder<Integer> inputCoder) {
- return TestKeyedCombineFn.Accumulator.getCoder();
+ public Coder<TestCombineFn.Accumulator> getAccumulatorCoder(
+ CoderRegistry registry, Coder<Integer> inputCoder) {
+ return TestCombineFn.Accumulator.getCoder();
}
@Override
- public TestKeyedCombineFn.Accumulator createAccumulator(String key, Context c) {
- return new TestKeyedCombineFn.Accumulator(key + c.sideInput(view).toString());
+ public TestCombineFn.Accumulator createAccumulator(Context c) {
+ return new TestCombineFn.Accumulator(c.sideInput(view).toString());
}
@Override
- public TestKeyedCombineFn.Accumulator addInput(
- String key, TestKeyedCombineFn.Accumulator accumulator, Integer value, Context c) {
+ public TestCombineFn.Accumulator addInput(
+ TestCombineFn.Accumulator accumulator, Integer value, Context c) {
try {
- assertThat(accumulator.value, Matchers.startsWith(key + c.sideInput(view).toString()));
- return new TestKeyedCombineFn.Accumulator(accumulator.value + String.valueOf(value));
+ assertThat(accumulator.value, Matchers.startsWith(c.sideInput(view).toString()));
+ return new TestCombineFn.Accumulator(accumulator.value + String.valueOf(value));
} finally {
accumulator.value = "cleared in addInput";
}
@@ -1023,21 +1012,21 @@ public class CombineTest implements Serializable {
}
@Override
- public TestKeyedCombineFn.Accumulator mergeAccumulators(
- String key, Iterable<TestKeyedCombineFn.Accumulator> accumulators, Context c) {
- String keyPrefix = key + c.sideInput(view).toString();
- String all = keyPrefix;
- for (TestKeyedCombineFn.Accumulator accumulator : accumulators) {
- assertThat(accumulator.value, Matchers.startsWith(keyPrefix));
- all += accumulator.value.substring(keyPrefix.length());
+ public TestCombineFn.Accumulator mergeAccumulators(
+ Iterable<TestCombineFn.Accumulator> accumulators, Context c) {
+ String prefix = c.sideInput(view).toString();
+ String all = prefix;
+ for (TestCombineFn.Accumulator accumulator : accumulators) {
+ assertThat(accumulator.value, Matchers.startsWith(prefix));
+ all += accumulator.value.substring(prefix.length());
accumulator.value = "cleared in mergeAccumulators";
}
- return new TestKeyedCombineFn.Accumulator(all);
+ return new TestCombineFn.Accumulator(all);
}
@Override
- public String extractOutput(String key, TestKeyedCombineFn.Accumulator accumulator, Context c) {
- assertThat(accumulator.value, Matchers.startsWith(key + c.sideInput(view).toString()));
+ public String extractOutput(TestCombineFn.Accumulator accumulator, Context c) {
+ assertThat(accumulator.value, Matchers.startsWith(c.sideInput(view).toString()));
char[] chars = accumulator.value.toCharArray();
Arrays.sort(chars);
return new String(chars);
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 1a976f2..52b2f5e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -2501,7 +2501,7 @@ public class ParDoTest implements Serializable {
};
thrown.expect(RuntimeException.class);
- thrown.expectMessage("Unable to infer a coder for GroupingState and no Coder was specified.");
+ thrown.expectMessage("Unable to infer a coder for CombiningState and no Coder was specified.");
pipeline
.apply(Create.of(KV.of("hello", 3), KV.of("hello", 6), KV.of("hello", 7)))
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index 867fe0a..b3fa2c6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -1122,7 +1122,7 @@ public class ViewTest implements Serializable {
final PCollectionView<Map<String, Integer>> view =
pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("a", 20), KV.of("b", 3)))
- .apply("SumIntegers", Combine.perKey(Sum.ofIntegers().<String>asKeyedFn()))
+ .apply("SumIntegers", Combine.<String, Integer, Integer>perKey(Sum.ofIntegers()))
.apply(View.<String, Integer>asMap());
PCollection<KV<String, Integer>> output =
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
index 36a90e9..798e8dc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
@@ -29,7 +29,6 @@ import java.io.ObjectOutputStream;
import java.util.List;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.util.state.StateContexts;
import org.junit.Before;
@@ -48,12 +47,12 @@ public class CombineFnUtilTest {
@Rule
public ExpectedException expectedException = ExpectedException.none();
- KeyedCombineFnWithContext<Integer, Integer, Integer, Integer> mockCombineFn;
+ CombineFnWithContext<Integer, Integer, Integer> mockCombineFn;
@SuppressWarnings("unchecked")
@Before
public void setUp() {
- mockCombineFn = mock(KeyedCombineFnWithContext.class, withSettings().serializable());
+ mockCombineFn = mock(CombineFnWithContext.class, withSettings().serializable());
}
@Test
@@ -72,10 +71,6 @@ public class CombineFnUtilTest {
CombineFnWithContext<Integer, int[], Integer> fnWithContext =
CombineFnUtil.toFnWithContext(Sum.ofIntegers());
assertTrue(fnWithContext == CombineFnUtil.toFnWithContext(fnWithContext));
-
- KeyedCombineFnWithContext<Object, Integer, int[], Integer> keyedFnWithContext =
- CombineFnUtil.toFnWithContext(Sum.ofIntegers().asKeyedFn());
- assertTrue(keyedFnWithContext == CombineFnUtil.toFnWithContext(keyedFnWithContext));
}
@Test
@@ -89,14 +84,5 @@ public class CombineFnUtilTest {
accum = fnWithContext.addInput(accum, i, nullContext);
}
assertEquals(10, fnWithContext.extractOutput(accum, nullContext).intValue());
-
- KeyedCombineFnWithContext<String, Integer, int[], Integer> keyedFnWithContext =
- CombineFnUtil.toFnWithContext(Sum.ofIntegers().<String>asKeyedFn());
- String key = "key";
- accum = keyedFnWithContext.createAccumulator(key, nullContext);
- for (Integer i : inputs) {
- accum = keyedFnWithContext.addInput(key, accum, i, nullContext);
- }
- assertEquals(10, keyedFnWithContext.extractOutput(key, accum, nullContext).intValue());
}
}
[6/6] beam git commit: This closes #2636: Remove KeyedCombineFn
Posted by ke...@apache.org.
This closes #2636: Remove KeyedCombineFn
Update Dataflow worker version to beam-master-20170430
Remove KeyedCombineFn
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9f2733ac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9f2733ac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9f2733ac
Branch: refs/heads/master
Commit: 9f2733ac460ce42d6b3bd49f3db1bacb771ef85c
Parents: a198f8d 07ca542
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Apr 30 18:38:56 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Apr 30 18:38:56 2017 -0700
----------------------------------------------------------------------
.../translation/utils/ApexStateInternals.java | 38 +-
.../runners/core/GlobalCombineFnRunner.java | 78 +++
.../runners/core/GlobalCombineFnRunners.java | 193 ++++++
.../runners/core/InMemoryStateInternals.java | 50 +-
.../runners/core/PerKeyCombineFnRunner.java | 79 ---
.../runners/core/PerKeyCombineFnRunners.java | 161 -----
.../org/apache/beam/runners/core/StateTag.java | 18 +-
.../org/apache/beam/runners/core/StateTags.java | 43 +-
.../beam/runners/core/SystemReduceFn.java | 15 +-
.../beam/runners/core/ReduceFnRunnerTest.java | 36 +-
.../beam/runners/core/ReduceFnTester.java | 15 +-
.../apache/beam/runners/core/StateTagTest.java | 22 +-
.../CopyOnAccessInMemoryStateInternals.java | 66 +-
.../CopyOnAccessInMemoryStateInternalsTest.java | 34 -
.../flink/FlinkBatchTransformTranslators.java | 9 +-
.../functions/AbstractFlinkCombineRunner.java | 44 +-
.../FlinkMergingNonShuffleReduceFunction.java | 10 +-
.../functions/FlinkPartialReduceFunction.java | 6 +-
.../functions/FlinkReduceFunction.java | 10 +-
.../functions/SortingFlinkCombineRunner.java | 1 -
.../state/FlinkBroadcastStateInternals.java | 173 ++---
.../state/FlinkKeyGroupStateInternals.java | 119 ++--
.../state/FlinkSplitStateInternals.java | 119 ++--
.../streaming/state/FlinkStateInternals.java | 173 ++---
runners/google-cloud-dataflow-java/pom.xml | 2 +-
.../spark/stateful/SparkStateInternals.java | 40 +-
.../spark/translation/SparkKeyedCombineFn.java | 26 +-
.../spark/translation/TransformTranslator.java | 44 +-
.../streaming/StreamingTransformTranslator.java | 4 +-
.../runners/spark/SparkRunnerDebuggerTest.java | 7 +-
.../src/main/resources/beam/findbugs-filter.xml | 2 +-
.../sdk/transforms/ApproximateQuantiles.java | 8 +-
.../beam/sdk/transforms/ApproximateUnique.java | 3 +-
.../org/apache/beam/sdk/transforms/Combine.java | 672 +++++--------------
.../beam/sdk/transforms/CombineFnBase.java | 136 ----
.../apache/beam/sdk/transforms/CombineFns.java | 448 +------------
.../beam/sdk/transforms/CombineWithContext.java | 174 +----
.../org/apache/beam/sdk/transforms/Top.java | 6 +-
.../org/apache/beam/sdk/transforms/View.java | 2 +-
.../apache/beam/sdk/util/AppliedCombineFn.java | 35 +-
.../org/apache/beam/sdk/util/CombineFnUtil.java | 123 ++--
.../apache/beam/sdk/util/state/StateBinder.java | 19 +-
.../apache/beam/sdk/util/state/StateSpecs.java | 177 ++---
.../beam/sdk/transforms/CombineFnsTest.java | 114 ++--
.../apache/beam/sdk/transforms/CombineTest.java | 213 +++---
.../apache/beam/sdk/transforms/ParDoTest.java | 2 +-
.../apache/beam/sdk/transforms/ViewTest.java | 2 +-
.../apache/beam/sdk/util/CombineFnUtilTest.java | 18 +-
48 files changed, 1179 insertions(+), 2610 deletions(-)
----------------------------------------------------------------------
[5/6] beam git commit: Remove KeyedCombineFn
Posted by ke...@apache.org.
Remove KeyedCombineFn
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7e04924e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7e04924e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7e04924e
Branch: refs/heads/master
Commit: 7e04924ee7b31e28326f761618173749a55789d0
Parents: a198f8d
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Apr 21 14:04:02 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Apr 30 18:17:42 2017 -0700
----------------------------------------------------------------------
.../translation/utils/ApexStateInternals.java | 38 +-
.../runners/core/GlobalCombineFnRunner.java | 78 +++
.../runners/core/GlobalCombineFnRunners.java | 193 ++++++
.../runners/core/InMemoryStateInternals.java | 50 +-
.../runners/core/PerKeyCombineFnRunner.java | 79 ---
.../runners/core/PerKeyCombineFnRunners.java | 161 -----
.../org/apache/beam/runners/core/StateTag.java | 18 +-
.../org/apache/beam/runners/core/StateTags.java | 43 +-
.../beam/runners/core/SystemReduceFn.java | 15 +-
.../beam/runners/core/ReduceFnRunnerTest.java | 36 +-
.../beam/runners/core/ReduceFnTester.java | 15 +-
.../apache/beam/runners/core/StateTagTest.java | 22 +-
.../CopyOnAccessInMemoryStateInternals.java | 66 +-
.../CopyOnAccessInMemoryStateInternalsTest.java | 34 -
.../flink/FlinkBatchTransformTranslators.java | 9 +-
.../functions/AbstractFlinkCombineRunner.java | 44 +-
.../FlinkMergingNonShuffleReduceFunction.java | 10 +-
.../functions/FlinkPartialReduceFunction.java | 6 +-
.../functions/FlinkReduceFunction.java | 10 +-
.../functions/SortingFlinkCombineRunner.java | 1 -
.../state/FlinkBroadcastStateInternals.java | 173 ++---
.../state/FlinkKeyGroupStateInternals.java | 119 ++--
.../state/FlinkSplitStateInternals.java | 119 ++--
.../streaming/state/FlinkStateInternals.java | 173 ++---
.../spark/stateful/SparkStateInternals.java | 40 +-
.../spark/translation/SparkKeyedCombineFn.java | 26 +-
.../spark/translation/TransformTranslator.java | 44 +-
.../streaming/StreamingTransformTranslator.java | 4 +-
.../runners/spark/SparkRunnerDebuggerTest.java | 7 +-
.../src/main/resources/beam/findbugs-filter.xml | 2 +-
.../sdk/transforms/ApproximateQuantiles.java | 8 +-
.../beam/sdk/transforms/ApproximateUnique.java | 3 +-
.../org/apache/beam/sdk/transforms/Combine.java | 672 +++++--------------
.../beam/sdk/transforms/CombineFnBase.java | 136 ----
.../apache/beam/sdk/transforms/CombineFns.java | 448 +------------
.../beam/sdk/transforms/CombineWithContext.java | 174 +----
.../org/apache/beam/sdk/transforms/Top.java | 6 +-
.../org/apache/beam/sdk/transforms/View.java | 2 +-
.../apache/beam/sdk/util/AppliedCombineFn.java | 35 +-
.../org/apache/beam/sdk/util/CombineFnUtil.java | 123 ++--
.../apache/beam/sdk/util/state/StateBinder.java | 19 +-
.../apache/beam/sdk/util/state/StateSpecs.java | 177 ++---
.../beam/sdk/transforms/CombineFnsTest.java | 114 ++--
.../apache/beam/sdk/transforms/CombineTest.java | 213 +++---
.../apache/beam/sdk/transforms/ParDoTest.java | 2 +-
.../apache/beam/sdk/transforms/ViewTest.java | 2 +-
.../apache/beam/sdk/util/CombineFnUtilTest.java | 18 +-
47 files changed, 1178 insertions(+), 2609 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index ec8f666..e682894 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -42,8 +42,7 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CoderUtils;
@@ -145,7 +144,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
address,
accumCoder,
key,
- combineFn.<K>asKeyedFn()
+ combineFn
);
}
@@ -158,24 +157,11 @@ public class ApexStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValue(
+ bindCombiningValueWithContext(
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
- KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return new ApexCombiningState<>(
- namespace,
- address,
- accumCoder,
- key, combineFn);
- }
-
- @Override
- public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
- return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
}
}
@@ -323,12 +309,12 @@ public class ApexStateInternals<K> implements StateInternals<K> {
extends AbstractState<AccumT>
implements CombiningState<InputT, AccumT, OutputT> {
private final K key;
- private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
+ private final CombineFn<InputT, AccumT, OutputT> combineFn;
private ApexCombiningState(StateNamespace namespace,
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> coder,
- K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+ K key, CombineFn<InputT, AccumT, OutputT> combineFn) {
super(namespace, address, coder);
this.key = key;
this.combineFn = combineFn;
@@ -341,13 +327,13 @@ public class ApexStateInternals<K> implements StateInternals<K> {
@Override
public OutputT read() {
- return combineFn.extractOutput(key, getAccum());
+ return combineFn.extractOutput(getAccum());
}
@Override
public void add(InputT input) {
AccumT accum = getAccum();
- combineFn.addInput(key, accum, input);
+ combineFn.addInput(accum, input);
writeValue(accum);
}
@@ -355,7 +341,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
public AccumT getAccum() {
AccumT accum = readValue();
if (accum == null) {
- accum = combineFn.createAccumulator(key);
+ accum = combineFn.createAccumulator();
}
return accum;
}
@@ -376,13 +362,13 @@ public class ApexStateInternals<K> implements StateInternals<K> {
@Override
public void addAccum(AccumT accum) {
- accum = combineFn.mergeAccumulators(key, Arrays.asList(getAccum(), accum));
+ accum = combineFn.mergeAccumulators(Arrays.asList(getAccum(), accum));
writeValue(accum);
}
@Override
public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(key, accumulators);
+ return combineFn.mergeAccumulators(accumulators);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java
new file mode 100644
index 0000000..5325ba6
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.Serializable;
+import java.util.Collection;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.SideInputReader;
+
+/**
+ * An interface that runs a {@link GlobalCombineFn} with unified APIs.
+ *
+ * <p>Different combine functions have their own implementations. For example, the implementation
+ * can skip allocating {@code Combine.Context}, if the combine function doesn't use it.
+ */
+public interface GlobalCombineFnRunner<InputT, AccumT, OutputT> extends Serializable {
+ /**
+ * Forwards the call to a {@link GlobalCombineFn} to create the accumulator.
+ *
+ * <p>It constructs a {@code CombineWithContext.Context} from
+ * {@link PipelineOptions} and {@link SideInputReader} if it is required.
+ */
+ AccumT createAccumulator(PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
+
+ /**
+ * Forwards the call to a {@link GlobalCombineFn} to add the input.
+ *
+ * <p>It constructs a {@code CombineWithContext.Context} from
+ * {@link PipelineOptions} and {@link SideInputReader} if it is required.
+ */
+ AccumT addInput(AccumT accumulator, InputT value, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
+
+ /**
+ * Forwards the call to a {@link GlobalCombineFn} to merge accumulators.
+ *
+ * <p>It constructs a {@code CombineWithContext.Context} from
+ * {@link PipelineOptions} and {@link SideInputReader} if it is required.
+ */
+ AccumT mergeAccumulators(Iterable<AccumT> accumulators, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
+
+ /**
+ * Forwards the call to a {@link GlobalCombineFn} to extract the output.
+ *
+ * <p>It constructs a {@code CombineWithContext.Context} from
+ * {@link PipelineOptions} and {@link SideInputReader} if it is required.
+ */
+ OutputT extractOutput(AccumT accumulator, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
+
+ /**
+ * Forwards the call to a {@link GlobalCombineFn} to compact the accumulator.
+ *
+ * <p>It constructs a {@code CombineWithContext.Context} from
+ * {@link PipelineOptions} and {@link SideInputReader} if it is required.
+ */
+ AccumT compact(AccumT accumulator, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java
new file mode 100644
index 0000000..d45b503
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CombineContextFactory;
+import org.apache.beam.sdk.util.SideInputReader;
+
+/**
+ * Static utility methods that provide {@link GlobalCombineFnRunner} implementations for different
+ * combine functions.
+ */
+public class GlobalCombineFnRunners {
+ /** Returns a {@link GlobalCombineFnRunner} from a {@link GlobalCombineFn}. */
+ public static <InputT, AccumT, OutputT> GlobalCombineFnRunner<InputT, AccumT, OutputT> create(
+ GlobalCombineFn<InputT, AccumT, OutputT> globalCombineFn) {
+ if (globalCombineFn instanceof CombineFnWithContext) {
+ return new CombineFnWithContextRunner<>(
+ (CombineFnWithContext<InputT, AccumT, OutputT>) globalCombineFn);
+ } else if (globalCombineFn instanceof CombineFn) {
+ return new CombineFnRunner<>((CombineFn<InputT, AccumT, OutputT>) globalCombineFn);
+ } else {
+ throw new IllegalStateException(
+ String.format("Unknown type of CombineFn: %s", globalCombineFn.getClass()));
+ }
+ }
+
+ /**
+ * An implementation of {@link GlobalCombineFnRunner} with {@link CombineFn}.
+ *
+ * <p>It forwards functions calls to the {@link CombineFn}.
+ */
+ private static class CombineFnRunner<InputT, AccumT, OutputT>
+ implements org.apache.beam.runners.core.GlobalCombineFnRunner<InputT, AccumT, OutputT> {
+ private final CombineFn<InputT, AccumT, OutputT> combineFn;
+
+ private CombineFnRunner(CombineFn<InputT, AccumT, OutputT> combineFn) {
+ this.combineFn = combineFn;
+ }
+
+ @Override
+ public String toString() {
+ return combineFn.toString();
+ }
+
+ @Override
+ public AccumT createAccumulator(
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFn.createAccumulator();
+ }
+
+ @Override
+ public AccumT addInput(
+ AccumT accumulator,
+ InputT input,
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFn.addInput(accumulator, input);
+ }
+
+ @Override
+ public AccumT mergeAccumulators(
+ Iterable<AccumT> accumulators,
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFn.mergeAccumulators(accumulators);
+ }
+
+ @Override
+ public OutputT extractOutput(
+ AccumT accumulator,
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFn.extractOutput(accumulator);
+ }
+
+ @Override
+ public AccumT compact(
+ AccumT accumulator,
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFn.compact(accumulator);
+ }
+ }
+
+ /**
+ * An implementation of {@link org.apache.beam.runners.core.GlobalCombineFnRunner} with {@link
+ * CombineFnWithContext}.
+ *
+ * <p>It forwards functions calls to the {@link CombineFnWithContext}.
+ */
+ private static class CombineFnWithContextRunner<InputT, AccumT, OutputT>
+ implements org.apache.beam.runners.core.GlobalCombineFnRunner<InputT, AccumT, OutputT> {
+ private final CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext;
+
+ private CombineFnWithContextRunner(
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
+ this.combineFnWithContext = combineFnWithContext;
+ }
+
+ @Override
+ public String toString() {
+ return combineFnWithContext.toString();
+ }
+
+ @Override
+ public AccumT createAccumulator(
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFnWithContext.createAccumulator(
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+
+ @Override
+ public AccumT addInput(
+ AccumT accumulator,
+ InputT input,
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFnWithContext.addInput(
+ accumulator,
+ input,
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+
+ @Override
+ public AccumT mergeAccumulators(
+ Iterable<AccumT> accumulators,
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFnWithContext.mergeAccumulators(
+ accumulators,
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+
+ @Override
+ public OutputT extractOutput(
+ AccumT accumulator,
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFnWithContext.extractOutput(
+ accumulator,
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+
+ @Override
+ public AccumT compact(
+ AccumT accumulator,
+ PipelineOptions options,
+ SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return combineFnWithContext.compact(
+ accumulator,
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index 9fb8e3f..2c02282 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -31,8 +31,7 @@ import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
@@ -152,7 +151,7 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
final CombineFn<InputT, AccumT, OutputT> combineFn) {
- return new InMemoryCombiningState<K, InputT, AccumT, OutputT>(key, combineFn.<K>asKeyedFn());
+ return new InMemoryCombiningState<>(combineFn);
}
@Override
@@ -164,20 +163,11 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValue(
+ bindCombiningValueWithContext(
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
- KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return new InMemoryCombiningState<K, InputT, AccumT, OutputT>(key, combineFn);
- }
-
- @Override
- public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
- return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
}
}
@@ -310,23 +300,21 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
/**
* An {@link InMemoryState} implementation of {@link CombiningState}.
*/
- public static final class InMemoryCombiningState<K, InputT, AccumT, OutputT>
+ public static final class InMemoryCombiningState<InputT, AccumT, OutputT>
implements CombiningState<InputT, AccumT, OutputT>,
- InMemoryState<InMemoryCombiningState<K, InputT, AccumT, OutputT>> {
- private final K key;
+ InMemoryState<InMemoryCombiningState<InputT, AccumT, OutputT>> {
private boolean isCleared = true;
- private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
+ private final CombineFn<InputT, AccumT, OutputT> combineFn;
private AccumT accum;
public InMemoryCombiningState(
- K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- this.key = key;
+ CombineFn<InputT, AccumT, OutputT> combineFn) {
this.combineFn = combineFn;
- accum = combineFn.createAccumulator(key);
+ accum = combineFn.createAccumulator();
}
@Override
- public InMemoryCombiningState<K, InputT, AccumT, OutputT> readLater() {
+ public InMemoryCombiningState<InputT, AccumT, OutputT> readLater() {
return this;
}
@@ -334,19 +322,19 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
public void clear() {
// Even though we're clearing we can't remove this from the in-memory state map, since
// other users may already have a handle on this CombiningValue.
- accum = combineFn.createAccumulator(key);
+ accum = combineFn.createAccumulator();
isCleared = true;
}
@Override
public OutputT read() {
- return combineFn.extractOutput(key, accum);
+ return combineFn.extractOutput(accum);
}
@Override
public void add(InputT input) {
isCleared = false;
- accum = combineFn.addInput(key, accum, input);
+ accum = combineFn.addInput(accum, input);
}
@Override
@@ -371,12 +359,12 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
@Override
public void addAccum(AccumT accum) {
isCleared = false;
- this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, accum));
+ this.accum = combineFn.mergeAccumulators(Arrays.asList(this.accum, accum));
}
@Override
public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(key, accumulators);
+ return combineFn.mergeAccumulators(accumulators);
}
@Override
@@ -385,9 +373,9 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
}
@Override
- public InMemoryCombiningState<K, InputT, AccumT, OutputT> copy() {
- InMemoryCombiningState<K, InputT, AccumT, OutputT> that =
- new InMemoryCombiningState<>(key, combineFn);
+ public InMemoryCombiningState<InputT, AccumT, OutputT> copy() {
+ InMemoryCombiningState<InputT, AccumT, OutputT> that =
+ new InMemoryCombiningState<>(combineFn);
if (!this.isCleared) {
that.isCleared = this.isCleared;
that.addAccum(accum);
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
deleted file mode 100644
index a6608a7..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.io.Serializable;
-import java.util.Collection;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SideInputReader;
-
-/**
- * An interface that runs a {@link PerKeyCombineFn} with unified APIs.
- *
- * <p>Different keyed combine functions have their own implementations.
- * For example, the implementation can skip allocating {@code Combine.Context},
- * if the keyed combine function doesn't use it.
- */
-public interface PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> extends Serializable {
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from
- * {@link PipelineOptions} and {@link SideInputReader} if it is required.
- */
- AccumT createAccumulator(K key, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to add the input.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from
- * {@link PipelineOptions} and {@link SideInputReader} if it is required.
- */
- AccumT addInput(K key, AccumT accumulator, InputT value, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from
- * {@link PipelineOptions} and {@link SideInputReader} if it is required.
- */
- AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to extract the output.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from
- * {@link PipelineOptions} and {@link SideInputReader} if it is required.
- */
- OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to compact the accumulator.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from
- * {@link PipelineOptions} and {@link SideInputReader} if it is required.
- */
- AccumT compact(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
deleted file mode 100644
index 7736758..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.SideInputReader;
-
-/**
- * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations
- * for different keyed combine functions.
- */
-public class PerKeyCombineFnRunners {
- /**
- * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
- */
- public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
- create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
- if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
- return new KeyedCombineFnWithContextRunner<>(
- (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
- } else if (perKeyCombineFn instanceof KeyedCombineFn) {
- return new KeyedCombineFnRunner<>(
- (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
- } else {
- throw new IllegalStateException(
- String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass()));
- }
- }
-
- /**
- * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
- *
- * <p>It forwards functions calls to the {@link KeyedCombineFn}.
- */
- private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
- implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
- private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
-
- private KeyedCombineFnRunner(
- KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
- this.keyedCombineFn = keyedCombineFn;
- }
-
- @Override
- public String toString() {
- return keyedCombineFn.toString();
- }
-
- @Override
- public AccumT createAccumulator(K key, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.createAccumulator(key);
- }
-
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.addInput(key, accumulator, input);
- }
-
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.mergeAccumulators(key, accumulators);
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.extractOutput(key, accumulator);
- }
-
- @Override
- public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.compact(key, accumulator);
- }
- }
-
- /**
- * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
- *
- * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
- */
- private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
- implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
- private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
-
- private KeyedCombineFnWithContextRunner(
- KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
- this.keyedCombineFnWithContext = keyedCombineFnWithContext;
- }
-
- @Override
- public String toString() {
- return keyedCombineFnWithContext.toString();
- }
-
- @Override
- public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader,
- Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.createAccumulator(key,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.addInput(key, accumulator, input,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.mergeAccumulators(key, accumulators,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.extractOutput(key, accumulator,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.compact(key, accumulator,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
index a5d262a..aaeecf0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
@@ -23,8 +23,7 @@ import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
@@ -100,17 +99,10 @@ public interface StateTag<K, StateT extends State> extends Serializable {
CombineFn<InputT, AccumT, OutputT> combineFn);
<InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
- Coder<AccumT> accumCoder,
- KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
-
- <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT>
- combineFn);
+ CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+ Coder<AccumT> accumCoder,
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn);
/**
* Bind to a watermark {@link StateSpec}.
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index 2b3f4b8..fe99f27 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -26,8 +26,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.state.BagState;
@@ -90,22 +89,12 @@ public class StateTags {
@Override
public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombining(
+ CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(
String id,
StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
- KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return binder.bindKeyedCombiningValue(tagForSpec(id, spec), accumCoder, combineFn);
- }
-
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningWithContext(
- String id,
- StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
- return binder.bindKeyedCombiningValueWithContext(
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return binder.bindCombiningValueWithContext(
tagForSpec(id, spec), accumCoder, combineFn);
}
@@ -162,29 +151,17 @@ public class StateTags {
}
/**
- * Create a state tag for values that use a {@link KeyedCombineFn} to automatically merge
- * multiple {@code InputT}s into a single {@code OutputT}.
- */
- public static <K, InputT, AccumT,
- OutputT> StateTag<K, CombiningState<InputT, AccumT, OutputT>>
- keyedCombiningValue(String id, Coder<AccumT> accumCoder,
- KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- return new SimpleStateTag<>(
- new StructuredId(id), StateSpecs.keyedCombining(accumCoder, combineFn));
- }
-
- /**
- * Create a state tag for values that use a {@link KeyedCombineFnWithContext} to automatically
+ * Create a state tag for values that use a {@link CombineFnWithContext} to automatically
* merge multiple {@code InputT}s into a single {@code OutputT}.
*/
- public static <K, InputT, AccumT, OutputT>
- StateTag<K, CombiningState<InputT, AccumT, OutputT>>
- keyedCombiningValueWithContext(
+ public static <InputT, AccumT, OutputT>
+ StateTag<Object, CombiningState<InputT, AccumT, OutputT>>
+ combiningValueWithContext(
String id,
Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return new SimpleStateTag<>(
- new StructuredId(id), StateSpecs.keyedCombiningWithContext(accumCoder, combineFn));
+ new StructuredId(id), StateSpecs.combining(accumCoder, combineFn));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index f618d88..86a7fd7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -20,8 +20,7 @@ package org.apache.beam.runners.core;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.AppliedCombineFn;
@@ -71,18 +70,18 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound
AccumT, OutputT, W>
combining(
final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- final StateTag<K, CombiningState<InputT, AccumT, OutputT>> bufferTag;
- if (combineFn.getFn() instanceof KeyedCombineFnWithContext) {
+ final StateTag<Object, CombiningState<InputT, AccumT, OutputT>> bufferTag;
+ if (combineFn.getFn() instanceof CombineFnWithContext) {
bufferTag = StateTags.makeSystemTagInternal(
- StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValueWithContext(
+ StateTags.<InputT, AccumT, OutputT>combiningValueWithContext(
BUFFER_NAME, combineFn.getAccumulatorCoder(),
- (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) combineFn.getFn()));
+ (CombineFnWithContext<InputT, AccumT, OutputT>) combineFn.getFn()));
} else {
bufferTag = StateTags.makeSystemTagInternal(
- StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValue(
+ StateTags.<InputT, AccumT, OutputT>combiningValue(
BUFFER_NAME, combineFn.getAccumulatorCoder(),
- (KeyedCombineFn<K, InputT, AccumT, OutputT>) combineFn.getFn()));
+ (CombineFn<InputT, AccumT, OutputT>) combineFn.getFn()));
}
return new SystemReduceFn<K, InputT, AccumT, OutputT, W>(bufferTag) {
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 44bc538..ec2e7a3 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -218,7 +218,7 @@ public class ReduceFnRunnerTest {
ReduceFnTester.combining(
strategy,
mockTriggerStateMachine,
- Sum.ofIntegers().<String>asKeyedFn(),
+ Sum.ofIntegers(),
VarIntCoder.of());
injectElement(tester, 2);
@@ -291,7 +291,7 @@ public class ReduceFnRunnerTest {
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
ReduceFnTester
- .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
+ .combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
tester.injectElements(TimestampedValue.of(13, elementTimestamp));
@@ -323,7 +323,7 @@ public class ReduceFnRunnerTest {
ReduceFnTester.combining(
strategy,
mockTriggerStateMachine,
- Sum.ofIntegers().<String>asKeyedFn(),
+ Sum.ofIntegers(),
VarIntCoder.of());
injectElement(tester, 1);
@@ -387,9 +387,14 @@ public class ReduceFnRunnerTest {
});
SumAndVerifyContextFn combineFn = new SumAndVerifyContextFn(mockView, expectedValue);
- ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
- mainInputWindowingStrategy, mockTriggerStateMachine, combineFn.<String>asKeyedFn(),
- VarIntCoder.of(), options, mockSideInputReader);
+ ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+ ReduceFnTester.combining(
+ mainInputWindowingStrategy,
+ mockTriggerStateMachine,
+ combineFn,
+ VarIntCoder.of(),
+ options,
+ mockSideInputReader);
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
for (int i = 0; i < 8; ++i) {
@@ -1062,12 +1067,13 @@ public class ReduceFnRunnerTest {
*/
@Test
public void testDropDataMultipleWindowsFinishedTrigger() throws Exception {
- ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
- WindowingStrategy.of(
- SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30)))
- .withTrigger(AfterWatermark.pastEndOfWindow())
- .withAllowedLateness(Duration.millis(1000)),
- Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
+ ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+ ReduceFnTester.combining(
+ WindowingStrategy.of(SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30)))
+ .withTrigger(AfterWatermark.pastEndOfWindow())
+ .withAllowedLateness(Duration.millis(1000)),
+ Sum.ofIntegers(),
+ VarIntCoder.of());
tester.injectElements(
// assigned to [-60, 40), [-30, 70), [0, 100)
@@ -1209,8 +1215,7 @@ public class ReduceFnRunnerTest {
.withAllowedLateness(Duration.millis(100));
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
- ReduceFnTester
- .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
+ ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
tester.advanceInputWatermark(new Instant(0));
tester.advanceProcessingTime(new Instant(0));
@@ -1265,8 +1270,7 @@ public class ReduceFnRunnerTest {
.withAllowedLateness(Duration.millis(100));
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
- ReduceFnTester
- .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
+ ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
tester.advanceInputWatermark(new Instant(0));
tester.advanceProcessingTime(new Instant(0));
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index b5b5492..dfb769f 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -53,8 +53,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -170,13 +169,13 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
/**
* Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy} and
- * {@link KeyedCombineFn}, creating a {@link TriggerStateMachine} from the
+ * {@link CombineFn}, creating a {@link TriggerStateMachine} from the
* {@link Trigger} in the {@link WindowingStrategy}.
*/
public static <W extends BoundedWindow, AccumT, OutputT>
ReduceFnTester<Integer, OutputT, W> combining(
WindowingStrategy<?, W> strategy,
- KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn,
+ CombineFn<Integer, AccumT, OutputT> combineFn,
Coder<OutputT> outputCoder)
throws Exception {
@@ -194,7 +193,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
/**
* Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy},
- * {@link KeyedCombineFn}, and {@link TriggerStateMachine}, for mocking the interaction
+ * {@link CombineFn}, and {@link TriggerStateMachine}, for mocking the interaction
* between {@link ReduceFnRunner} and the {@link TriggerStateMachine}.
* Ignores the {@link Trigger} in the {@link WindowingStrategy}.
*/
@@ -202,7 +201,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
ReduceFnTester<Integer, OutputT, W> combining(
WindowingStrategy<?, W> strategy,
TriggerStateMachine triggerStateMachine,
- KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn,
+ CombineFn<Integer, AccumT, OutputT> combineFn,
Coder<OutputT> outputCoder)
throws Exception {
@@ -223,7 +222,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
public static <W extends BoundedWindow, AccumT, OutputT>
ReduceFnTester<Integer, OutputT, W> combining(
WindowingStrategy<?, W> strategy,
- KeyedCombineFnWithContext<String, Integer, AccumT, OutputT> combineFn,
+ CombineFnWithContext<Integer, AccumT, OutputT> combineFn,
Coder<OutputT> outputCoder,
PipelineOptions options,
SideInputReader sideInputReader)
@@ -246,7 +245,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
ReduceFnTester<Integer, OutputT, W> combining(
WindowingStrategy<?, W> strategy,
TriggerStateMachine triggerStateMachine,
- KeyedCombineFnWithContext<String, Integer, AccumT, OutputT> combineFn,
+ CombineFnWithContext<Integer, AccumT, OutputT> combineFn,
Coder<OutputT> outputCoder,
PipelineOptions options,
SideInputReader sideInputReader)
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
index 10dcb62..9a8b75c 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
@@ -162,17 +162,17 @@ public class StateTagTest {
Coder<int[]> accum1 = maxFn.getAccumulatorCoder(registry, VarIntCoder.of());
Coder<int[]> accum2 = minFn.getAccumulatorCoder(registry, BigEndianIntegerCoder.of());
- StateTag<?, ?> fooCoder1Max1 = StateTags.keyedCombiningValueWithContext(
- "foo", accum1, CombineFnUtil.toFnWithContext(maxFn).<String>asKeyedFn());
- StateTag<?, ?> fooCoder1Max2 = StateTags.keyedCombiningValueWithContext(
- "foo", accum1, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn());
- StateTag<?, ?> fooCoder1Min = StateTags.keyedCombiningValueWithContext(
- "foo", accum1, CombineFnUtil.toFnWithContext(minFn).asKeyedFn());
-
- StateTag<?, ?> fooCoder2Max = StateTags.keyedCombiningValueWithContext(
- "foo", accum2, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn());
- StateTag<?, ?> barCoder1Max = StateTags.keyedCombiningValueWithContext(
- "bar", accum1, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn());
+ StateTag<?, ?> fooCoder1Max1 = StateTags.combiningValueWithContext(
+ "foo", accum1, CombineFnUtil.toFnWithContext(maxFn));
+ StateTag<?, ?> fooCoder1Max2 = StateTags.combiningValueWithContext(
+ "foo", accum1, CombineFnUtil.toFnWithContext(maxFn));
+ StateTag<?, ?> fooCoder1Min = StateTags.combiningValueWithContext(
+ "foo", accum1, CombineFnUtil.toFnWithContext(minFn));
+
+ StateTag<?, ?> fooCoder2Max = StateTags.combiningValueWithContext(
+ "foo", accum2, CombineFnUtil.toFnWithContext(maxFn));
+ StateTag<?, ?> barCoder1Max = StateTags.combiningValueWithContext(
+ "bar", accum1, CombineFnUtil.toFnWithContext(maxFn));
// Same name, coder and combineFn
assertEquals(fooCoder1Max1, fooCoder1Max2);
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index 068b37f..92d87b5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -40,8 +40,7 @@ import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTag.StateBinder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
@@ -283,11 +282,10 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@SuppressWarnings("unchecked")
InMemoryState<? extends WatermarkHoldState> existingState =
(InMemoryState<? extends WatermarkHoldState>)
- underlying.get().get(namespace, address, c);
+ underlying.get().get(namespace, address, c);
return existingState.copy();
} else {
- return new InMemoryWatermarkHold<>(
- timestampCombiner);
+ return new InMemoryWatermarkHold<>(timestampCombiner);
}
}
@@ -298,7 +296,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@SuppressWarnings("unchecked")
InMemoryState<? extends ValueState<T>> existingState =
(InMemoryState<? extends ValueState<T>>)
- underlying.get().get(namespace, address, c);
+ underlying.get().get(namespace, address, c);
return existingState.copy();
} else {
return new InMemoryValue<>();
@@ -306,10 +304,11 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
}
@Override
- public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindCombiningValue(
+ public <InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
+ Coder<AccumT> accumCoder,
+ CombineFn<InputT, AccumT, OutputT> combineFn) {
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>> existingState =
@@ -317,8 +316,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
underlying.get().get(namespace, address, c);
return existingState.copy();
} else {
- return new InMemoryCombiningState<>(
- key, combineFn.asKeyedFn());
+ return new InMemoryCombiningState<>(combineFn);
}
}
@@ -329,7 +327,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@SuppressWarnings("unchecked")
InMemoryState<? extends BagState<T>> existingState =
(InMemoryState<? extends BagState<T>>)
- underlying.get().get(namespace, address, c);
+ underlying.get().get(namespace, address, c);
return existingState.copy();
} else {
return new InMemoryBag<>();
@@ -353,7 +351,8 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
StateTag<? super K, MapState<KeyT, ValueT>> address,
- Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
+ Coder<KeyT> mapKeyCoder,
+ Coder<ValueT> mapValueCoder) {
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
InMemoryState<? extends MapState<KeyT, ValueT>> existingState =
@@ -366,30 +365,12 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
}
@Override
- public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValue(
+ public <InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
- KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- if (containedInUnderlying(namespace, address)) {
- @SuppressWarnings("unchecked")
- InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>> existingState =
- (InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>>)
- underlying.get().get(namespace, address, c);
- return existingState.copy();
- } else {
- return new InMemoryCombiningState<>(key, combineFn);
- }
- }
-
- @Override
- public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
- return bindKeyedCombiningValue(
- address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
}
};
}
@@ -475,20 +456,11 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return underlying.get(namespace, address, c);
- }
-
- @Override
- public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValueWithContext(
+ bindCombiningValueWithContext(
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
- return bindKeyedCombiningValue(
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return bindCombiningValue(
address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
}
};
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index f0aeece..4d04745 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -40,7 +40,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
@@ -251,39 +250,6 @@ public class CopyOnAccessInMemoryStateInternalsTest {
}
@Test
- public void testKeyedAccumulatorCombiningStateWithUnderlying() throws Exception {
- CopyOnAccessInMemoryStateInternals<String> underlying =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- KeyedCombineFn<String, Long, long[], Long> sumLongFn = Sum.ofLongs().asKeyedFn();
-
- StateNamespace namespace = new StateNamespaceForTest("foo");
- CoderRegistry reg = pipeline.getCoderRegistry();
- StateTag<String, CombiningState<Long, long[], Long>> stateTag =
- StateTags.keyedCombiningValue(
- "summer",
- sumLongFn.getAccumulatorCoder(
- reg, StringUtf8Coder.of(), reg.getDefaultCoder(Long.class)),
- sumLongFn);
- GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
- assertThat(underlyingValue.read(), equalTo(0L));
-
- underlyingValue.add(1L);
- assertThat(underlyingValue.read(), equalTo(1L));
-
- CopyOnAccessInMemoryStateInternals<String> internals =
- CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
- GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
- assertThat(copyOnAccessState.read(), equalTo(1L));
-
- copyOnAccessState.add(4L);
- assertThat(copyOnAccessState.read(), equalTo(5L));
- assertThat(underlyingValue.read(), equalTo(1L));
-
- GroupingState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
- assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
- }
-
- @Test
public void testWatermarkHoldStateWithUnderlying() {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index 99de5be..6a7689a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -188,8 +188,7 @@ class FlinkBatchTransformTranslators {
DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
context.getInputDataSet(context.getInput(transform));
- Combine.KeyedCombineFn<K, InputT, List<InputT>, List<InputT>> combineFn =
- new Concatenate<InputT>().asKeyedFn();
+ Combine.CombineFn<InputT, List<InputT>, List<InputT>> combineFn = new Concatenate<>();
KvCoder<K, InputT> inputCoder =
(KvCoder<K, InputT>) context.getInput(transform).getCoder();
@@ -200,7 +199,6 @@ class FlinkBatchTransformTranslators {
accumulatorCoder =
combineFn.getAccumulatorCoder(
context.getInput(transform).getPipeline().getCoderRegistry(),
- inputCoder.getKeyCoder(),
inputCoder.getValueCoder());
} catch (CannotProvideCoderException e) {
throw new RuntimeException(e);
@@ -337,8 +335,8 @@ class FlinkBatchTransformTranslators {
DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
context.getInputDataSet(context.getInput(transform));
- CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn =
- (CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT>) transform.getFn();
+ CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn =
+ (CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT>) transform.getFn();
KvCoder<K, InputT> inputCoder =
(KvCoder<K, InputT>) context.getInput(transform).getCoder();
@@ -349,7 +347,6 @@ class FlinkBatchTransformTranslators {
accumulatorCoder =
combineFn.getAccumulatorCoder(
context.getInput(transform).getPipeline().getCoderRegistry(),
- inputCoder.getKeyCoder(),
inputCoder.getValueCoder());
} catch (CannotProvideCoderException e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
index 83ff70d..6e27057 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.flink.translation.functions;
import com.google.common.collect.ImmutableList;
import java.util.Collection;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.core.GlobalCombineFnRunner;
+import org.apache.beam.runners.core.GlobalCombineFnRunners;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -53,7 +53,7 @@ public abstract class AbstractFlinkCombineRunner<
Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception;
/**
- * Adapter interface that allows using a {@link CombineFnBase.PerKeyCombineFn} to either produce
+ * Adapter interface that allows using a {@link CombineFnBase.GlobalCombineFn} to either produce
* the {@code AccumT} as output or to combine several accumulators into an {@code OutputT}.
* The former would be used for a partial combine while the latter is used for the final merging
* of accumulators.
@@ -72,17 +72,17 @@ public abstract class AbstractFlinkCombineRunner<
}
/**
- * A straight wrapper of {@link CombineFnBase.PerKeyCombineFn} that takes in {@code InputT}
+ * A straight wrapper of {@link CombineFnBase.GlobalCombineFn} that takes in {@code InputT}
* and emits {@code OutputT}.
*/
public static class CompleteFlinkCombiner<K, InputT, AccumT, OutputT> implements
FlinkCombiner<K, InputT, AccumT, OutputT> {
- private final PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner;
+ private final GlobalCombineFnRunner<InputT, AccumT, OutputT> combineFnRunner;
public CompleteFlinkCombiner(
- CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- combineFnRunner = PerKeyCombineFnRunners.create(combineFn);
+ CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn) {
+ combineFnRunner = GlobalCombineFnRunners.create(combineFn);
}
@Override
@@ -90,22 +90,22 @@ public abstract class AbstractFlinkCombineRunner<
K key, InputT value, PipelineOptions options, SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows) {
AccumT accumulator =
- combineFnRunner.createAccumulator(key, options, sideInputReader, windows);
- return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows);
+ combineFnRunner.createAccumulator(options, sideInputReader, windows);
+ return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows);
}
@Override
public AccumT addInput(
K key, AccumT accumulator, InputT value, PipelineOptions options,
SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows);
+ return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows);
}
@Override
public OutputT extractOutput(
K key, AccumT accumulator, PipelineOptions options, SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows) {
- return combineFnRunner.extractOutput(key, accumulator, options, sideInputReader, windows);
+ return combineFnRunner.extractOutput(accumulator, options, sideInputReader, windows);
}
}
@@ -115,10 +115,10 @@ public abstract class AbstractFlinkCombineRunner<
public static class PartialFlinkCombiner<K, InputT, AccumT> implements
FlinkCombiner<K, InputT, AccumT, AccumT> {
- private final PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner;
+ private final GlobalCombineFnRunner<InputT, AccumT, ?> combineFnRunner;
- public PartialFlinkCombiner(CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn) {
- combineFnRunner = PerKeyCombineFnRunners.create(combineFn);
+ public PartialFlinkCombiner(CombineFnBase.GlobalCombineFn<InputT, AccumT, ?> combineFn) {
+ combineFnRunner = GlobalCombineFnRunners.create(combineFn);
}
@Override
@@ -126,15 +126,15 @@ public abstract class AbstractFlinkCombineRunner<
K key, InputT value, PipelineOptions options, SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows) {
AccumT accumulator =
- combineFnRunner.createAccumulator(key, options, sideInputReader, windows);
- return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows);
+ combineFnRunner.createAccumulator(options, sideInputReader, windows);
+ return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows);
}
@Override
public AccumT addInput(
K key, AccumT accumulator, InputT value, PipelineOptions options,
SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows);
+ return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows);
}
@Override
@@ -151,10 +151,10 @@ public abstract class AbstractFlinkCombineRunner<
public static class FinalFlinkCombiner<K, AccumT, OutputT> implements
FlinkCombiner<K, AccumT, AccumT, OutputT> {
- private final PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner;
+ private final GlobalCombineFnRunner<?, AccumT, OutputT> combineFnRunner;
- public FinalFlinkCombiner(CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn) {
- combineFnRunner = PerKeyCombineFnRunners.create(combineFn);
+ public FinalFlinkCombiner(CombineFnBase.GlobalCombineFn<?, AccumT, OutputT> combineFn) {
+ combineFnRunner = GlobalCombineFnRunners.create(combineFn);
}
@Override
@@ -169,14 +169,14 @@ public abstract class AbstractFlinkCombineRunner<
K key, AccumT accumulator, AccumT value, PipelineOptions options,
SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
return combineFnRunner.mergeAccumulators(
- key, ImmutableList.of(accumulator, value), options, sideInputReader, windows);
+ ImmutableList.of(accumulator, value), options, sideInputReader, windows);
}
@Override
public OutputT extractOutput(
K key, AccumT accumulator, PipelineOptions options, SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows) {
- return combineFnRunner.extractOutput(key, accumulator, options, sideInputReader, windows);
+ return combineFnRunner.extractOutput(accumulator, options, sideInputReader, windows);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index 3712598..9ccf079 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -41,7 +41,7 @@ public class FlinkMergingNonShuffleReduceFunction<
K, InputT, AccumT, OutputT, W extends BoundedWindow>
extends RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> {
- private final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn;
+ private final CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn;
private final WindowingStrategy<Object, W> windowingStrategy;
@@ -50,12 +50,12 @@ public class FlinkMergingNonShuffleReduceFunction<
private final SerializedPipelineOptions serializedOptions;
public FlinkMergingNonShuffleReduceFunction(
- CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn,
+ CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn,
WindowingStrategy<Object, W> windowingStrategy,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
PipelineOptions pipelineOptions) {
- this.combineFn = keyedCombineFn;
+ this.combineFn = combineFn;
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
@@ -75,7 +75,6 @@ public class FlinkMergingNonShuffleReduceFunction<
new FlinkSideInputReader(sideInputs, getRuntimeContext());
AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> reduceRunner;
-
if (windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
reduceRunner = new SortingFlinkCombineRunner<>();
} else {
@@ -83,13 +82,12 @@ public class FlinkMergingNonShuffleReduceFunction<
}
reduceRunner.combine(
- new AbstractFlinkCombineRunner.CompleteFlinkCombiner<>(combineFn),
+ new AbstractFlinkCombineRunner.CompleteFlinkCombiner<K, InputT, AccumT, OutputT>(combineFn),
windowingStrategy,
sideInputReader,
options,
elements,
out);
-
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index 9a44840..4099f52 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -42,7 +42,7 @@ import org.apache.flink.util.Collector;
public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWindow>
extends RichGroupCombineFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, AccumT>>> {
- protected final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn;
+ protected final CombineFnBase.GlobalCombineFn<InputT, AccumT, ?> combineFn;
protected final WindowingStrategy<Object, W> windowingStrategy;
@@ -51,7 +51,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
public FlinkPartialReduceFunction(
- CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn,
+ CombineFnBase.GlobalCombineFn<InputT, AccumT, ?> combineFn,
WindowingStrategy<Object, W> windowingStrategy,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
PipelineOptions pipelineOptions) {
@@ -83,7 +83,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
}
reduceRunner.combine(
- new AbstractFlinkCombineRunner.PartialFlinkCombiner<>(combineFn),
+ new AbstractFlinkCombineRunner.PartialFlinkCombiner<K, InputT, AccumT>(combineFn),
windowingStrategy,
sideInputReader,
options,
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index 6c1a2e4..90dcbff 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -42,7 +42,7 @@ import org.apache.flink.util.Collector;
public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
extends RichGroupReduceFunction<WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> {
- protected final CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn;
+ protected final CombineFnBase.GlobalCombineFn<?, AccumT, OutputT> combineFn;
protected final WindowingStrategy<Object, W> windowingStrategy;
@@ -51,12 +51,12 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
protected final SerializedPipelineOptions serializedOptions;
public FlinkReduceFunction(
- CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn,
+ CombineFnBase.GlobalCombineFn<?, AccumT, OutputT> combineFn,
WindowingStrategy<Object, W> windowingStrategy,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
PipelineOptions pipelineOptions) {
- this.combineFn = keyedCombineFn;
+ this.combineFn = combineFn;
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
@@ -83,15 +83,13 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
} else {
reduceRunner = new SortingFlinkCombineRunner<>();
}
-
reduceRunner.combine(
- new AbstractFlinkCombineRunner.FinalFlinkCombiner<>(combineFn),
+ new AbstractFlinkCombineRunner.FinalFlinkCombiner<K, AccumT, OutputT>(combineFn),
windowingStrategy,
sideInputReader,
options,
elements,
out);
-
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
index eac465c..4aacb4a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
@@ -43,7 +43,6 @@ import org.joda.time.Instant;
public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends BoundedWindow>
extends AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> {
-
@Override
public void combine(
FlinkCombiner<K, InputT, AccumT, OutputT> flinkCombiner,
[3/6] beam git commit: Remove KeyedCombineFn
Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 5ffaef8..0be8517 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -43,12 +43,9 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.AbstractPerKeyCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -155,7 +152,7 @@ public class Combine {
*/
public static <K, V> PerKey<K, V, V> perKey(
SerializableFunction<Iterable<V>, V> fn) {
- return perKey(IterableCombineFn.of(fn).<K>asKeyedFn(), displayDataForFn(fn));
+ return perKey(IterableCombineFn.of(fn), displayDataForFn(fn));
}
/**
@@ -176,32 +173,11 @@ public class Combine {
*/
public static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(
GlobalCombineFn<? super InputT, ?, OutputT> fn) {
- return perKey(fn.<K>asKeyedFn(), displayDataForFn(fn));
- }
-
- /**
- * Returns a {@link PerKey Combine.PerKey} {@code PTransform} that
- * first groups its input {@code PCollection} of {@code KV}s by keys and
- * windows, then invokes the given function on each of the key/values-lists
- * pairs to produce a combined value, and then returns a
- * {@code PCollection} of {@code KV}s mapping each distinct key to
- * its combined value for each window.
- *
- * <p>Each output element is in the window by which its corresponding input
- * was grouped, and has the timestamp of the end of that window. The output
- * {@code PCollection} has the same
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * as the input.
- *
- * <p>See {@link PerKey Combine.PerKey} for more information.
- */
- public static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn) {
return perKey(fn, displayDataForFn(fn));
}
private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+ GlobalCombineFn<? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
return new PerKey<>(fn, fnDisplayData, false /*fewKeys*/);
}
@@ -211,7 +187,7 @@ public class Combine {
* in {@link GroupByKey}.
*/
private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> fewKeys(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+ GlobalCombineFn<? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
return new PerKey<>(fn, fnDisplayData, true /*fewKeys*/);
}
@@ -239,7 +215,7 @@ public class Combine {
*/
public static <K, V> GroupedValues<K, V, V> groupedValues(
SerializableFunction<Iterable<V>, V> fn) {
- return groupedValues(IterableCombineFn.of(fn).<K>asKeyedFn(), displayDataForFn(fn));
+ return groupedValues(IterableCombineFn.of(fn), displayDataForFn(fn));
}
/**
@@ -265,37 +241,11 @@ public class Combine {
*/
public static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(
GlobalCombineFn<? super InputT, ?, OutputT> fn) {
- return groupedValues(fn.<K>asKeyedFn(), displayDataForFn(fn));
- }
-
- /**
- * Returns a {@link GroupedValues Combine.GroupedValues}
- * {@code PTransform} that takes a {@code PCollection} of
- * {@code KV}s where a key maps to an {@code Iterable} of values, e.g.,
- * the result of a {@code GroupByKey}, then uses the given
- * {@code KeyedCombineFn} to combine all the values associated with
- * each key. The combining function is provided the key. The types
- * of the input and output values can differ.
- *
- * <p>Each output element has the same timestamp and is in the same window
- * as its corresponding input element, and the output
- * {@code PCollection} has the same
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * associated with it as the input.
- *
- * <p>See {@link GroupedValues Combine.GroupedValues} for more information.
- *
- * <p>Note that {@link #perKey(CombineFnBase.PerKeyCombineFn)} is typically
- * more convenient to use than {@link GroupByKey} followed by
- * {@code groupedValues(...)}.
- */
- public static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn) {
return groupedValues(fn, displayDataForFn(fn));
}
private static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+ GlobalCombineFn<? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
return new GroupedValues<>(fn, fnDisplayData);
}
@@ -471,81 +421,8 @@ public class Combine {
public TypeDescriptor<OutputT> getOutputType() {
return new TypeDescriptor<OutputT>(getClass()) {};
}
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @Override
- public <K> KeyedCombineFn<K, InputT, AccumT, OutputT> asKeyedFn() {
- // The key, an object, is never even looked at.
- return new KeyIgnoringCombineFn<>(this);
- }
-
- private static class KeyIgnoringCombineFn<K, InputT, AccumT, OutputT>
- extends KeyedCombineFn<K, InputT, AccumT, OutputT>
- implements NameOverride {
-
- private final CombineFn<InputT, AccumT, OutputT> fn;
-
- private KeyIgnoringCombineFn(CombineFn<InputT, AccumT, OutputT> fn) {
- this.fn = fn;
- }
-
- @Override
- public AccumT createAccumulator(K key) {
- return fn.createAccumulator();
- }
-
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT input) {
- return fn.addInput(accumulator, input);
- }
-
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators) {
- return fn.mergeAccumulators(accumulators);
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator) {
- return fn.extractOutput(accumulator);
- }
-
- @Override
- public AccumT compact(K key, AccumT accumulator) {
- return fn.compact(accumulator);
- }
-
- @Override
- public Coder<AccumT> getAccumulatorCoder(
- CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder)
- throws CannotProvideCoderException {
- return fn.getAccumulatorCoder(registry, inputCoder);
- }
-
- @Override
- public Coder<OutputT> getDefaultOutputCoder(
- CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder)
- throws CannotProvideCoderException {
- return fn.getDefaultOutputCoder(registry, inputCoder);
- }
-
- @Override
- public CombineFn<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) {
- return fn;
- }
-
- @Override
- public void populateDisplayData(Builder builder) {
- builder.delegate(fn);
- }
-
- @Override
- public String getNameOverride() {
- return NameUtils.approximateSimpleName(fn);
- }
- }
}
-
/////////////////////////////////////////////////////////////////////////////
/**
@@ -621,7 +498,6 @@ public class Combine {
public Coder<V> getDefaultOutputCoder(CoderRegistry registry, Coder<V> inputCoder) {
return inputCoder;
}
-
}
/**
@@ -1083,215 +959,6 @@ public class Combine {
/////////////////////////////////////////////////////////////////////////////
-
- /**
- * A {@code KeyedCombineFn<K, InputT, AccumT, OutputT>} specifies how to combine
- * a collection of input values of type {@code InputT}, associated with
- * a key of type {@code K}, into a single output value of type
- * {@code OutputT}. It does this via one or more intermediate mutable
- * accumulator values of type {@code AccumT}.
- *
- * <p>The overall process to combine a collection of input
- * {@code InputT} values associated with an input {@code K} key into a
- * single output {@code OutputT} value is as follows:
- *
- * <ol>
- *
- * <li> The input {@code InputT} values are partitioned into one or more
- * batches.
- *
- * <li> For each batch, the {@link #createAccumulator} operation is
- * invoked to create a fresh mutable accumulator value of type
- * {@code AccumT}, initialized to represent the combination of zero
- * values.
- *
- * <li> For each input {@code InputT} value in a batch, the
- * {@link #addInput} operation is invoked to add the value to that
- * batch's accumulator {@code AccumT} value. The accumulator may just
- * record the new value (e.g., if {@code AccumT == List<InputT>}, or may do
- * work to represent the combination more compactly.
- *
- * <li> The {@link #mergeAccumulators} operation is invoked to
- * combine a collection of accumulator {@code AccumT} values into a
- * single combined output accumulator {@code AccumT} value, once the
- * merging accumulators have had all all the input values in their
- * batches added to them. This operation is invoked repeatedly,
- * until there is only one accumulator value left.
- *
- * <li> The {@link #extractOutput} operation is invoked on the final
- * accumulator {@code AccumT} value to get the output {@code OutputT} value.
- *
- * </ol>
- *
- * <p>All of these operations are passed the {@code K} key that the
- * values being combined are associated with.
- *
- * <p>For example:
- * <pre> {@code
- * public class ConcatFn
- * extends KeyedCombineFn<String, Integer, ConcatFn.Accum, String> {
- * public static class Accum {
- * String s = "";
- * }
- * public Accum createAccumulator(String key) {
- * return new Accum();
- * }
- * public Accum addInput(String key, Accum accum, Integer input) {
- * accum.s += "+" + input;
- * return accum;
- * }
- * public Accum mergeAccumulators(String key, Iterable<Accum> accums) {
- * Accum merged = new Accum();
- * for (Accum accum : accums) {
- * merged.s += accum.s;
- * }
- * return merged;
- * }
- * public String extractOutput(String key, Accum accum) {
- * return key + accum.s;
- * }
- * }
- * PCollection<KV<String, Integer>> pc = ...;
- * PCollection<KV<String, String>> pc2 = pc.apply(
- * Combine.perKey(new ConcatFn()));
- * } </pre>
- *
- * <p>Keyed combining functions used by {@link Combine.PerKey},
- * {@link Combine.GroupedValues}, and {@code PTransforms} derived
- * from them should be <i>associative</i> and <i>commutative</i>.
- * Associativity is required because input values are first broken
- * up into subgroups before being combined, and their intermediate
- * results further combined, in an arbitrary tree structure.
- * Commutativity is required because any order of the input values
- * is ignored when breaking up input values into groups.
- *
- * @param <K> type of keys
- * @param <InputT> type of input values
- * @param <AccumT> type of mutable accumulator values
- * @param <OutputT> type of output values
- */
- public abstract static class KeyedCombineFn<K, InputT, AccumT, OutputT>
- extends AbstractPerKeyCombineFn<K, InputT, AccumT, OutputT> {
- /**
- * Returns a new, mutable accumulator value representing the accumulation of zero input values.
- *
- * @param key the key that all the accumulated values using the
- * accumulator are associated with
- */
- public abstract AccumT createAccumulator(K key);
-
- /**
- * Adds the given input value to the given accumulator, returning the new accumulator value.
- *
- * <p>For efficiency, the input accumulator may be modified and returned.
- *
- * @param key the key that all the accumulated values using the
- * accumulator are associated with
- */
- public abstract AccumT addInput(K key, AccumT accumulator, InputT value);
-
- /**
- * Returns an accumulator representing the accumulation of all the
- * input values accumulated in the merging accumulators.
- *
- * <p>May modify any of the argument accumulators. May return a
- * fresh accumulator, or may return one of the (modified) argument
- * accumulators.
- *
- * @param key the key that all the accumulators are associated
- * with
- */
- public abstract AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators);
-
- /**
- * Returns the output value that is the result of combining all
- * the input values represented by the given accumulator.
- *
- * @param key the key that all the accumulated values using the
- * accumulator are associated with
- */
- public abstract OutputT extractOutput(K key, AccumT accumulator);
-
- /**
- * Returns an accumulator that represents the same logical value as the
- * input accumulator, but may have a more compact representation.
- *
- * <p>For most CombineFns this would be a no-op, but should be overridden
- * by CombineFns that (for example) buffer up elements and combine
- * them in batches.
- *
- * <p>For efficiency, the input accumulator may be modified and returned.
- *
- * <p>By default returns the original accumulator.
- */
- public AccumT compact(K key, AccumT accumulator) {
- return accumulator;
- }
-
- @Override
- public CombineFn<InputT, AccumT, OutputT> forKey(final K key, final Coder<K> keyCoder) {
- return new CombineFn<InputT, AccumT, OutputT>() {
-
- @Override
- public AccumT createAccumulator() {
- return KeyedCombineFn.this.createAccumulator(key);
- }
-
- @Override
- public AccumT addInput(AccumT accumulator, InputT input) {
- return KeyedCombineFn.this.addInput(key, accumulator, input);
- }
-
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return KeyedCombineFn.this.mergeAccumulators(key, accumulators);
- }
-
- @Override
- public OutputT extractOutput(AccumT accumulator) {
- return KeyedCombineFn.this.extractOutput(key, accumulator);
- }
-
- @Override
- public AccumT compact(AccumT accumulator) {
- return KeyedCombineFn.this.compact(key, accumulator);
- }
-
- @Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
- throws CannotProvideCoderException {
- return KeyedCombineFn.this.getAccumulatorCoder(registry, keyCoder, inputCoder);
- }
-
- @Override
- public Coder<OutputT> getDefaultOutputCoder(
- CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return KeyedCombineFn.this.getDefaultOutputCoder(registry, keyCoder, inputCoder);
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(KeyedCombineFn.this);
- }
- };
- }
-
- /**
- * Applies this {@code KeyedCombineFn} to a key and a collection
- * of input values to produce a combined output value.
- *
- * <p>Useful when testing the behavior of a {@code KeyedCombineFn}
- * separately from a {@code Combine} transform.
- */
- public OutputT apply(K key, Iterable<? extends InputT> inputs) {
- AccumT accum = createAccumulator(key);
- for (InputT input : inputs) {
- accum = addInput(key, accum, input);
- }
- return extractOutput(key, accum);
- }
- }
-
////////////////////////////////////////////////////////////////////////////
/**
@@ -1458,8 +1125,7 @@ public class Combine {
.apply(WithKeys.<Void, InputT>of((Void) null))
.setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()));
- Combine.PerKey<Void, InputT, OutputT> combine =
- Combine.fewKeys(fn.asKeyedFn(), fnDisplayData);
+ Combine.PerKey<Void, InputT, OutputT> combine = Combine.fewKeys(fn, fnDisplayData);
if (!sideInputs.isEmpty()) {
combine = combine.withSideInputs(sideInputs);
}
@@ -1788,13 +1454,13 @@ public class Combine {
public static class PerKey<K, InputT, OutputT>
extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
- private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
+ private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
private final boolean fewKeys;
private final List<PCollectionView<?>> sideInputs;
private PerKey(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+ GlobalCombineFn<? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean fewKeys) {
this.fn = fn;
this.fnDisplayData = fnDisplayData;
@@ -1803,7 +1469,7 @@ public class Combine {
}
private PerKey(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+ GlobalCombineFn<? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
boolean fewKeys, List<PCollectionView<?>> sideInputs) {
this.fn = fn;
@@ -1819,7 +1485,7 @@ public class Combine {
/**
* Returns a {@link PTransform} identical to this, but with the specified side inputs to use
- * in {@link KeyedCombineFnWithContext}.
+ * in {@link CombineFnWithContext}.
*/
public PerKey<K, InputT, OutputT> withSideInputs(PCollectionView<?>... sideInputs) {
return withSideInputs(Arrays.asList(sideInputs));
@@ -1827,7 +1493,7 @@ public class Combine {
/**
* Returns a {@link PTransform} identical to this, but with the specified side inputs to use
- * in {@link KeyedCombineFnWithContext}.
+ * in {@link CombineFnWithContext}.
*/
public PerKey<K, InputT, OutputT> withSideInputs(
Iterable<? extends PCollectionView<?>> sideInputs) {
@@ -1874,9 +1540,9 @@ public class Combine {
}
/**
- * Returns the {@link PerKeyCombineFn} used by this Combine operation.
+ * Returns the {@link GlobalCombineFn} used by this Combine operation.
*/
- public PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> getFn() {
+ public GlobalCombineFn<? super InputT, ?, OutputT> getFn() {
return fn;
}
@@ -1924,12 +1590,12 @@ public class Combine {
public static class PerKeyWithHotKeyFanout<K, InputT, OutputT>
extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
- private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
+ private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
private final SerializableFunction<? super K, Integer> hotKeyFanout;
private PerKeyWithHotKeyFanout(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+ GlobalCombineFn<? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
SerializableFunction<? super K, Integer> hotKeyFanout) {
this.fn = fn;
@@ -1951,8 +1617,8 @@ public class Combine {
// Name the accumulator type.
@SuppressWarnings("unchecked")
- final PerKeyCombineFn<K, InputT, AccumT, OutputT> typedFn =
- (PerKeyCombineFn<K, InputT, AccumT, OutputT>) this.fn;
+ final GlobalCombineFn<InputT, AccumT, OutputT> typedFn =
+ (GlobalCombineFn<InputT, AccumT, OutputT>) this.fn;
if (!(input.getCoder() instanceof KvCoder)) {
throw new IllegalStateException(
@@ -1966,7 +1632,7 @@ public class Combine {
try {
accumCoder = typedFn.getAccumulatorCoder(
input.getPipeline().getCoderRegistry(),
- inputCoder.getKeyCoder(), inputCoder.getValueCoder());
+ inputCoder.getValueCoder());
} catch (CannotProvideCoderException e) {
throw new IllegalStateException("Unable to determine accumulator coder.", e);
}
@@ -1979,38 +1645,37 @@ public class Combine {
// set of values, then drop the nonce and do a final combine of the
// aggregates. We do this by splitting the original CombineFn into two,
// on that does addInput + merge and another that does merge + extract.
- PerKeyCombineFn<KV<K, Integer>, InputT, AccumT, AccumT> hotPreCombine;
- PerKeyCombineFn<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT> postCombine;
- if (typedFn instanceof KeyedCombineFn) {
- final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedFn =
- (KeyedCombineFn<K, InputT, AccumT, OutputT>) typedFn;
+ GlobalCombineFn<InputT, AccumT, AccumT> hotPreCombine;
+ GlobalCombineFn<InputOrAccum<InputT, AccumT>, AccumT, OutputT> postCombine;
+ if (typedFn instanceof CombineFn) {
+ final CombineFn<InputT, AccumT, OutputT> fn =
+ (CombineFn<InputT, AccumT, OutputT>) typedFn;
hotPreCombine =
- new KeyedCombineFn<KV<K, Integer>, InputT, AccumT, AccumT>() {
+ new CombineFn<InputT, AccumT, AccumT>() {
@Override
- public AccumT createAccumulator(KV<K, Integer> key) {
- return keyedFn.createAccumulator(key.getKey());
+ public AccumT createAccumulator() {
+ return fn.createAccumulator();
}
@Override
- public AccumT addInput(KV<K, Integer> key, AccumT accumulator, InputT value) {
- return keyedFn.addInput(key.getKey(), accumulator, value);
+ public AccumT addInput(AccumT accumulator, InputT value) {
+ return fn.addInput(accumulator, value);
}
@Override
- public AccumT mergeAccumulators(
- KV<K, Integer> key, Iterable<AccumT> accumulators) {
- return keyedFn.mergeAccumulators(key.getKey(), accumulators);
+ public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+ return fn.mergeAccumulators(accumulators);
}
@Override
- public AccumT compact(KV<K, Integer> key, AccumT accumulator) {
- return keyedFn.compact(key.getKey(), accumulator);
+ public AccumT compact(AccumT accumulator) {
+ return fn.compact(accumulator);
}
@Override
- public AccumT extractOutput(KV<K, Integer> key, AccumT accumulator) {
+ public AccumT extractOutput(AccumT accumulator) {
return accumulator;
}
@Override
@SuppressWarnings("unchecked")
public Coder<AccumT> getAccumulatorCoder(
- CoderRegistry registry, Coder<KV<K, Integer>> keyCoder, Coder<InputT> inputCoder)
+ CoderRegistry registry, Coder<InputT> inputCoder)
throws CannotProvideCoderException {
return accumCoder;
}
@@ -2020,142 +1685,147 @@ public class Combine {
builder.delegate(PerKeyWithHotKeyFanout.this);
}
};
+
postCombine =
- new KeyedCombineFn<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT>() {
+ new CombineFn<InputOrAccum<InputT, AccumT>, AccumT, OutputT>() {
@Override
- public AccumT createAccumulator(K key) {
- return keyedFn.createAccumulator(key);
+ public AccumT createAccumulator() {
+ return fn.createAccumulator();
}
+
@Override
- public AccumT addInput(
- K key, AccumT accumulator, InputOrAccum<InputT, AccumT> value) {
+ public AccumT addInput(AccumT accumulator, InputOrAccum<InputT, AccumT> value) {
if (value.accum == null) {
- return keyedFn.addInput(key, accumulator, value.input);
+ return fn.addInput(accumulator, value.input);
} else {
- return keyedFn.mergeAccumulators(key, ImmutableList.of(accumulator, value.accum));
+ return fn.mergeAccumulators(ImmutableList.of(accumulator, value.accum));
}
}
+
@Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators) {
- return keyedFn.mergeAccumulators(key, accumulators);
+ public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+ return fn.mergeAccumulators(accumulators);
}
+
@Override
- public AccumT compact(K key, AccumT accumulator) {
- return keyedFn.compact(key, accumulator);
+ public AccumT compact(AccumT accumulator) {
+ return fn.compact(accumulator);
}
+
@Override
- public OutputT extractOutput(K key, AccumT accumulator) {
- return keyedFn.extractOutput(key, accumulator);
+ public OutputT extractOutput(AccumT accumulator) {
+ return fn.extractOutput(accumulator);
}
+
@Override
public Coder<OutputT> getDefaultOutputCoder(
- CoderRegistry registry,
- Coder<K> keyCoder,
- Coder<InputOrAccum<InputT, AccumT>> accumulatorCoder)
+ CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> accumulatorCoder)
throws CannotProvideCoderException {
- return keyedFn.getDefaultOutputCoder(
- registry, keyCoder, inputCoder.getValueCoder());
+ return fn.getDefaultOutputCoder(registry, inputCoder.getValueCoder());
}
@Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputOrAccum<InputT, AccumT>> inputCoder)
- throws CannotProvideCoderException {
+ public Coder<AccumT> getAccumulatorCoder(
+ CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> inputCoder)
+ throws CannotProvideCoderException {
return accumCoder;
}
+
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.delegate(PerKeyWithHotKeyFanout.this);
}
};
- } else if (typedFn instanceof KeyedCombineFnWithContext) {
- final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedFnWithContext =
- (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) typedFn;
+ } else if (typedFn instanceof CombineFnWithContext) {
+ final CombineFnWithContext<InputT, AccumT, OutputT> fnWithContext =
+ (CombineFnWithContext<InputT, AccumT, OutputT>) typedFn;
hotPreCombine =
- new KeyedCombineFnWithContext<KV<K, Integer>, InputT, AccumT, AccumT>() {
+ new CombineFnWithContext<InputT, AccumT, AccumT>() {
@Override
- public AccumT createAccumulator(KV<K, Integer> key, Context c) {
- return keyedFnWithContext.createAccumulator(key.getKey(), c);
+ public AccumT createAccumulator(Context c) {
+ return fnWithContext.createAccumulator(c);
}
@Override
- public AccumT addInput(
- KV<K, Integer> key, AccumT accumulator, InputT value, Context c) {
- return keyedFnWithContext.addInput(key.getKey(), accumulator, value, c);
+ public AccumT addInput(AccumT accumulator, InputT value, Context c) {
+ return fnWithContext.addInput(accumulator, value, c);
}
@Override
- public AccumT mergeAccumulators(
- KV<K, Integer> key, Iterable<AccumT> accumulators, Context c) {
- return keyedFnWithContext.mergeAccumulators(key.getKey(), accumulators, c);
+ public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) {
+ return fnWithContext.mergeAccumulators(accumulators, c);
}
@Override
- public AccumT compact(KV<K, Integer> key, AccumT accumulator, Context c) {
- return keyedFnWithContext.compact(key.getKey(), accumulator, c);
+ public AccumT compact(AccumT accumulator, Context c) {
+ return fnWithContext.compact(accumulator, c);
}
@Override
- public AccumT extractOutput(KV<K, Integer> key, AccumT accumulator, Context c) {
+ public AccumT extractOutput(AccumT accumulator, Context c) {
return accumulator;
}
@Override
@SuppressWarnings("unchecked")
public Coder<AccumT> getAccumulatorCoder(
- CoderRegistry registry, Coder<KV<K, Integer>> keyCoder, Coder<InputT> inputCoder)
+ CoderRegistry registry, Coder<InputT> inputCoder)
throws CannotProvideCoderException {
return accumCoder;
}
+
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.delegate(PerKeyWithHotKeyFanout.this);
}
};
postCombine =
- new KeyedCombineFnWithContext<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT>() {
+ new CombineFnWithContext<InputOrAccum<InputT, AccumT>, AccumT, OutputT>() {
@Override
- public AccumT createAccumulator(K key, Context c) {
- return keyedFnWithContext.createAccumulator(key, c);
+ public AccumT createAccumulator(Context c) {
+ return fnWithContext.createAccumulator(c);
}
+
@Override
public AccumT addInput(
- K key, AccumT accumulator, InputOrAccum<InputT, AccumT> value, Context c) {
+ AccumT accumulator, InputOrAccum<InputT, AccumT> value, Context c) {
if (value.accum == null) {
- return keyedFnWithContext.addInput(key, accumulator, value.input, c);
+ return fnWithContext.addInput(accumulator, value.input, c);
} else {
- return keyedFnWithContext.mergeAccumulators(
- key, ImmutableList.of(accumulator, value.accum), c);
+ return fnWithContext.mergeAccumulators(
+ ImmutableList.of(accumulator, value.accum), c);
}
}
+
@Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c) {
- return keyedFnWithContext.mergeAccumulators(key, accumulators, c);
+ public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) {
+ return fnWithContext.mergeAccumulators(accumulators, c);
}
+
@Override
- public AccumT compact(K key, AccumT accumulator, Context c) {
- return keyedFnWithContext.compact(key, accumulator, c);
+ public AccumT compact(AccumT accumulator, Context c) {
+ return fnWithContext.compact(accumulator, c);
}
+
@Override
- public OutputT extractOutput(K key, AccumT accumulator, Context c) {
- return keyedFnWithContext.extractOutput(key, accumulator, c);
+ public OutputT extractOutput(AccumT accumulator, Context c) {
+ return fnWithContext.extractOutput(accumulator, c);
}
+
@Override
public Coder<OutputT> getDefaultOutputCoder(
- CoderRegistry registry,
- Coder<K> keyCoder,
- Coder<InputOrAccum<InputT, AccumT>> accumulatorCoder)
+ CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> accumulatorCoder)
throws CannotProvideCoderException {
- return keyedFnWithContext.getDefaultOutputCoder(
- registry, keyCoder, inputCoder.getValueCoder());
+ return fnWithContext.getDefaultOutputCoder(registry, inputCoder.getValueCoder());
}
@Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputOrAccum<InputT, AccumT>> inputCoder)
+ public Coder<AccumT> getAccumulatorCoder(
+ CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> inputCoder)
throws CannotProvideCoderException {
return accumCoder;
}
+
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.delegate(PerKeyWithHotKeyFanout.this);
@@ -2202,25 +1872,33 @@ public class Combine {
}
// Combine the hot and cold keys separately.
- PCollection<KV<K, InputOrAccum<InputT, AccumT>>> precombinedHot = split
- .get(hot)
- .setCoder(KvCoder.of(KvCoder.of(inputCoder.getKeyCoder(), VarIntCoder.of()),
- inputCoder.getValueCoder()))
- .setWindowingStrategyInternal(preCombineStrategy)
- .apply("PreCombineHot", Combine.perKey(hotPreCombine, fnDisplayData))
- .apply("StripNonce", MapElements.via(
- new SimpleFunction<KV<KV<K, Integer>, AccumT>,
- KV<K, InputOrAccum<InputT, AccumT>>>() {
- @Override
- public KV<K, InputOrAccum<InputT, AccumT>> apply(KV<KV<K, Integer>, AccumT> elem) {
- return KV.of(
- elem.getKey().getKey(),
- InputOrAccum.<InputT, AccumT>accum(elem.getValue()));
- }
- }))
- .setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder))
- .apply(Window.<KV<K, InputOrAccum<InputT, AccumT>>>remerge())
- .setWindowingStrategyInternal(input.getWindowingStrategy());
+ PCollection<KV<K, InputOrAccum<InputT, AccumT>>> precombinedHot =
+ split
+ .get(hot)
+ .setCoder(
+ KvCoder.of(
+ KvCoder.of(inputCoder.getKeyCoder(), VarIntCoder.of()),
+ inputCoder.getValueCoder()))
+ .setWindowingStrategyInternal(preCombineStrategy)
+ .apply(
+ "PreCombineHot",
+ Combine.<KV<K, Integer>, InputT, AccumT>perKey(hotPreCombine, fnDisplayData))
+ .apply(
+ "StripNonce",
+ MapElements.via(
+ new SimpleFunction<
+ KV<KV<K, Integer>, AccumT>, KV<K, InputOrAccum<InputT, AccumT>>>() {
+ @Override
+ public KV<K, InputOrAccum<InputT, AccumT>> apply(
+ KV<KV<K, Integer>, AccumT> elem) {
+ return KV.of(
+ elem.getKey().getKey(),
+ InputOrAccum.<InputT, AccumT>accum(elem.getValue()));
+ }
+ }))
+ .setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder))
+ .apply(Window.<KV<K, InputOrAccum<InputT, AccumT>>>remerge())
+ .setWindowingStrategyInternal(input.getWindowingStrategy());
PCollection<KV<K, InputOrAccum<InputT, AccumT>>> preprocessedCold = split
.get(cold)
.setCoder(inputCoder)
@@ -2235,9 +1913,12 @@ public class Combine {
.setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder));
// Combine the union of the pre-processed hot and cold key results.
- return PCollectionList.of(precombinedHot).and(preprocessedCold)
+ return PCollectionList.of(precombinedHot)
+ .and(preprocessedCold)
.apply(Flatten.<KV<K, InputOrAccum<InputT, AccumT>>>pCollections())
- .apply("PostCombine", Combine.perKey(postCombine, fnDisplayData));
+ .apply(
+ "PostCombine",
+ Combine.<K, InputOrAccum<InputT, AccumT>, OutputT>perKey(postCombine, fnDisplayData));
}
@Override
@@ -2325,71 +2006,61 @@ public class Combine {
/////////////////////////////////////////////////////////////////////////////
/**
- * {@code GroupedValues<K, InputT, OutputT>} takes a
- * {@code PCollection<KV<K, Iterable<InputT>>>}, such as the result of
- * {@link GroupByKey}, applies a specified
- * {@link KeyedCombineFn KeyedCombineFn<K, InputT, AccumT, OutputT>}
- * to each of the input {@code KV<K, Iterable<InputT>>} elements to
- * produce a combined output {@code KV<K, OutputT>} element, and returns a
- * {@code PCollection<KV<K, OutputT>>} containing all the combined output
- * elements. It is common for {@code InputT == OutputT}, but not required.
- * Common combining functions include sums, mins, maxes, and averages
- * of numbers, conjunctions and disjunctions of booleans, statistical
- * aggregations, etc.
+ * {@code GroupedValues<K, InputT, OutputT>} takes a {@code PCollection<KV<K, Iterable<InputT>>>},
+ * such as the result of {@link GroupByKey}, applies a specified {@link CombineFn
+ * CombineFn<InputT, AccumT, OutputT>} to each of the input {@code KV<K,
+ * Iterable<InputT>>} elements to produce a combined output {@code KV<K, OutputT>} element, and
+ * returns a {@code PCollection<KV<K, OutputT>>} containing all the combined output elements. It
+ * is common for {@code InputT == OutputT}, but not required. Common combining functions include
+ * sums, mins, maxes, and averages of numbers, conjunctions and disjunctions of booleans,
+ * statistical aggregations, etc.
*
* <p>Example of use:
- * <pre> {@code
+ *
+ * <pre>{@code
* PCollection<KV<String, Integer>> pc = ...;
* PCollection<KV<String, Iterable<Integer>>> groupedByKey = pc.apply(
* new GroupByKey<String, Integer>());
* PCollection<KV<String, Integer>> sumByKey = groupedByKey.apply(
* Combine.<String, Integer>groupedValues(
* new Sum.SumIntegerFn()));
- * } </pre>
+ * }
+ * </pre>
*
- * <p>See also {@link #perKey}/{@link PerKey Combine.PerKey}, which
- * captures the common pattern of "combining by key" in a
- * single easy-to-use {@code PTransform}.
+ * <p>See also {@link #perKey}/{@link PerKey Combine.PerKey}, which captures the common pattern of
+ * "combining by key" in a single easy-to-use {@code PTransform}.
*
- * <p>Combining for different keys can happen in parallel. Moreover,
- * combining of the {@code Iterable<InputT>} values associated a single
- * key can happen in parallel, with different subsets of the values
- * being combined separately, and their intermediate results combined
- * further, in an arbitrary tree reduction pattern, until a single
- * result value is produced for each key.
+ * <p>Combining for different keys can happen in parallel. Moreover, combining of the {@code
+ * Iterable<InputT>} values associated a single key can happen in parallel, with different subsets
+ * of the values being combined separately, and their intermediate results combined further, in an
+ * arbitrary tree reduction pattern, until a single result value is produced for each key.
*
- * <p>By default, the {@code Coder} of the keys of the output
- * {@code PCollection<KV<K, OutputT>>} is that of the keys of the input
- * {@code PCollection<KV<K, InputT>>}, and the {@code Coder} of the values
- * of the output {@code PCollection<KV<K, OutputT>>} is inferred from the
- * concrete type of the {@code KeyedCombineFn<K, InputT, AccumT, OutputT>}'s output
- * type {@code OutputT}.
+ * <p>By default, the {@code Coder} of the keys of the output {@code PCollection<KV<K, OutputT>>}
+ * is that of the keys of the input {@code PCollection<KV<K, InputT>>}, and the {@code Coder} of
+ * the values of the output {@code PCollection<KV<K, OutputT>>} is inferred from the concrete type
+ * of the {@code CombineFn<InputT, AccumT, OutputT>}'s output type {@code OutputT}.
*
- * <p>Each output element has the same timestamp and is in the same window
- * as its corresponding input element, and the output
- * {@code PCollection} has the same
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * associated with it as the input.
+ * <p>Each output element has the same timestamp and is in the same window as its corresponding
+ * input element, and the output {@code PCollection} has the same {@link
+ * org.apache.beam.sdk.transforms.windowing.WindowFn} associated with it as the input.
*
- * <p>See also {@link #globally}/{@link Globally Combine.Globally}, which
- * combines all the values in a {@code PCollection} into a
- * single value in a {@code PCollection}.
+ * <p>See also {@link #globally}/{@link Globally Combine.Globally}, which combines all the values
+ * in a {@code PCollection} into a single value in a {@code PCollection}.
*
* @param <K> type of input and output keys
* @param <InputT> type of input values
* @param <OutputT> type of output values
*/
public static class GroupedValues<K, InputT, OutputT>
- extends PTransform
- <PCollection<? extends KV<K, ? extends Iterable<InputT>>>,
- PCollection<KV<K, OutputT>>> {
+ extends PTransform<
+ PCollection<? extends KV<K, ? extends Iterable<InputT>>>, PCollection<KV<K, OutputT>>> {
- private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
+ private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
private final List<PCollectionView<?>> sideInputs;
private GroupedValues(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+ GlobalCombineFn<? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
this.fn = SerializableUtils.clone(fn);
this.fnDisplayData = fnDisplayData;
@@ -2397,7 +2068,7 @@ public class Combine {
}
private GroupedValues(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+ GlobalCombineFn<? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
List<PCollectionView<?>> sideInputs) {
this.fn = SerializableUtils.clone(fn);
@@ -2415,9 +2086,9 @@ public class Combine {
}
/**
- * Returns the KeyedCombineFn used by this Combine operation.
+ * Returns the {@link GlobalCombineFn} used by this Combine operation.
*/
- public PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> getFn() {
+ public GlobalCombineFn<? super InputT, ?, OutputT> getFn() {
return fn;
}
@@ -2436,9 +2107,9 @@ public class Combine {
K key = c.element().getKey();
OutputT output;
- if (fn instanceof KeyedCombineFnWithContext) {
- output = ((KeyedCombineFnWithContext<? super K, ? super InputT, ?, OutputT>) fn)
- .apply(key, c.element().getValue(), new CombineWithContext.Context() {
+ if (fn instanceof CombineFnWithContext) {
+ output = ((CombineFnWithContext<? super InputT, ?, OutputT>) fn)
+ .apply(c.element().getValue(), new CombineWithContext.Context() {
@Override
public PipelineOptions getPipelineOptions() {
return c.getPipelineOptions();
@@ -2449,9 +2120,9 @@ public class Combine {
return c.sideInput(view);
}
});
- } else if (fn instanceof KeyedCombineFn) {
- output = ((KeyedCombineFn<? super K, ? super InputT, ?, OutputT>) fn)
- .apply(key, c.element().getValue());
+ } else if (fn instanceof CombineFn) {
+ output = ((CombineFn<? super InputT, ?, OutputT>) fn)
+ .apply(c.element().getValue());
} else {
throw new IllegalStateException(
String.format("Unknown type of CombineFn: %s", fn.getClass()));
@@ -2516,10 +2187,9 @@ public class Combine {
KvCoder<K, InputT> kvCoder = getKvCoder(input.getCoder());
@SuppressWarnings("unchecked")
Coder<OutputT> outputValueCoder =
- ((PerKeyCombineFn<K, InputT, ?, OutputT>) fn)
- .getDefaultOutputCoder(
- input.getPipeline().getCoderRegistry(),
- kvCoder.getKeyCoder(), kvCoder.getValueCoder());
+ ((GlobalCombineFn<InputT, ?, OutputT>) fn)
+ .getDefaultOutputCoder(
+ input.getPipeline().getCoderRegistry(), kvCoder.getValueCoder());
return KvCoder.of(kvCoder.getKeyCoder(), outputValueCoder);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
index 770a390..a881099 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
@@ -25,9 +25,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -92,65 +90,6 @@ public class CombineFnBase {
*/
OutputT defaultValue();
- /**
- * Converts this {@code GloballyCombineFn} into an equivalent
- * {@link PerKeyCombineFn} that ignores the keys passed to it and
- * combines the values according to this {@code GloballyCombineFn}.
- *
- * @param <K> the type of the (ignored) keys
- */
- <K> PerKeyCombineFn<K, InputT, AccumT, OutputT> asKeyedFn();
- }
-
- /**
- * A {@code PerKeyCombineFn<K, InputT, AccumT, OutputT>} specifies how to combine
- * a collection of input values of type {@code InputT}, associated with
- * a key of type {@code K}, into a single output value of type
- * {@code OutputT}. It does this via one or more intermediate mutable
- * accumulator values of type {@code AccumT}.
- *
- * <p>Do not implement this interface directly.
- * Extends {@link KeyedCombineFn} and {@link KeyedCombineFnWithContext} instead.
- *
- * @param <K> type of keys
- * @param <InputT> type of input values
- * @param <AccumT> type of mutable accumulator values
- * @param <OutputT> type of output values
- */
- public interface PerKeyCombineFn<K, InputT, AccumT, OutputT>
- extends Serializable, HasDisplayData {
- /**
- * Returns the {@code Coder} to use for accumulator {@code AccumT}
- * values, or null if it is not able to be inferred.
- *
- * <p>By default, uses the knowledge of the {@code Coder} being
- * used for {@code K} keys and input {@code InputT} values and the
- * enclosing {@code Pipeline}'s {@code CoderRegistry} to try to
- * infer the Coder for {@code AccumT} values.
- *
- * <p>This is the Coder used to send data through a communication-intensive
- * shuffle step, so a compact and efficient representation may have
- * significant performance benefits.
- */
- Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException;
-
- /**
- * Returns the {@code Coder} to use by default for output
- * {@code OutputT} values, or null if it is not able to be inferred.
- *
- * <p>By default, uses the knowledge of the {@code Coder} being
- * used for {@code K} keys and input {@code InputT} values and the
- * enclosing {@code Pipeline}'s {@code CoderRegistry} to try to
- * infer the Coder for {@code OutputT} values.
- */
- Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException;
-
- /**
- * Returns the a regular {@link GlobalCombineFn} that operates on a specific key.
- */
- GlobalCombineFn<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder);
}
/**
@@ -228,79 +167,4 @@ public class CombineFnBase {
public void populateDisplayData(DisplayData.Builder builder) {
}
}
-
- /**
- * An abstract {@link PerKeyCombineFn} base class shared by
- * {@link KeyedCombineFn} and {@link KeyedCombineFnWithContext}.
- *
- * <p>Do not extends this class directly.
- * Extends {@link KeyedCombineFn} and {@link KeyedCombineFnWithContext} instead.
- *
- * @param <K> type of keys
- * @param <InputT> type of input values
- * @param <AccumT> type of mutable accumulator values
- * @param <OutputT> type of output values
- */
- abstract static class AbstractPerKeyCombineFn<K, InputT, AccumT, OutputT>
- implements PerKeyCombineFn<K, InputT, AccumT, OutputT> {
- @Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return registry.getDefaultCoder(getClass(), AbstractPerKeyCombineFn.class,
- ImmutableMap.<Type, Coder<?>>of(
- getKTypeVariable(), keyCoder, getInputTVariable(), inputCoder),
- getAccumTVariable());
- }
-
- @Override
- public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return registry.getDefaultCoder(getClass(), AbstractPerKeyCombineFn.class,
- ImmutableMap.<Type, Coder<?>>of(getKTypeVariable(), keyCoder, getInputTVariable(),
- inputCoder, getAccumTVariable(),
- this.getAccumulatorCoder(registry, keyCoder, inputCoder)),
- getOutputTVariable());
- }
-
- /**
- * Returns the {@link TypeVariable} of {@code K}.
- */
- public TypeVariable<?> getKTypeVariable() {
- return (TypeVariable<?>) new TypeDescriptor<K>(AbstractPerKeyCombineFn.class) {}.getType();
- }
-
- /**
- * Returns the {@link TypeVariable} of {@code InputT}.
- */
- public TypeVariable<?> getInputTVariable() {
- return (TypeVariable<?>)
- new TypeDescriptor<InputT>(AbstractPerKeyCombineFn.class) {}.getType();
- }
-
- /**
- * Returns the {@link TypeVariable} of {@code AccumT}.
- */
- public TypeVariable<?> getAccumTVariable() {
- return (TypeVariable<?>)
- new TypeDescriptor<AccumT>(AbstractPerKeyCombineFn.class) {}.getType();
- }
-
- /**
- * Returns the {@link TypeVariable} of {@code OutputT}.
- */
- public TypeVariable<?> getOutputTVariable() {
- return (TypeVariable<?>)
- new TypeDescriptor<OutputT>(AbstractPerKeyCombineFn.class) {}.getType();
- }
-
- /**
- * {@inheritDoc}
- *
- * <p>By default, does not register any display data. Implementors may override this method
- * to provide their own display data.
- */
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index ca939c1..cc02dcf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -37,12 +37,9 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.util.CombineFnUtil;
@@ -54,49 +51,6 @@ import org.apache.beam.sdk.values.TupleTag;
public class CombineFns {
/**
- * Returns a {@link ComposeKeyedCombineFnBuilder} to construct a composed
- * {@link PerKeyCombineFn}.
- *
- * <p>The same {@link TupleTag} cannot be used in a composition multiple times.
- *
- * <p>Example:
- * <pre>{@code
- * PCollection<KV<K, Integer>> latencies = ...;
- *
- * TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>();
- * TupleTag<Double> meanLatencyTag = new TupleTag<Double>();
- *
- * SimpleFunction<Integer, Integer> identityFn =
- * new SimpleFunction<Integer, Integer>() {
- * {@literal @}Override
- * public Integer apply(Integer input) {
- * return input;
- * }};
- * PCollection<KV<K, CoCombineResult>> maxAndMean = latencies.apply(
- * Combine.perKey(
- * CombineFns.composeKeyed()
- * .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
- * .with(identityFn, new MeanFn<Integer>(), meanLatencyTag)));
- *
- * PCollection<T> finalResultCollection = maxAndMean
- * .apply(ParDo.of(
- * new DoFn<KV<K, CoCombineResult>, T>() {
- * {@literal @}ProcessElement
- * public void processElement(ProcessContext c) throws Exception {
- * KV<K, CoCombineResult> e = c.element();
- * Integer maxLatency = e.getValue().get(maxLatencyTag);
- * Double meanLatency = e.getValue().get(meanLatencyTag);
- * .... Do Something ....
- * c.output(...some T...);
- * }
- * }));
- * }</pre>
- */
- public static ComposeKeyedCombineFnBuilder composeKeyed() {
- return new ComposeKeyedCombineFnBuilder();
- }
-
- /**
* Returns a {@link ComposeCombineFnBuilder} to construct a composed
* {@link GlobalCombineFn}.
*
@@ -142,67 +96,6 @@ public class CombineFns {
/////////////////////////////////////////////////////////////////////////////
/**
- * A builder class to construct a composed {@link PerKeyCombineFn}.
- */
- public static class ComposeKeyedCombineFnBuilder {
- /**
- * Returns a {@link ComposedKeyedCombineFn} that can take additional
- * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
- *
- * <p>The {@link ComposedKeyedCombineFn} extracts inputs from {@code DataT} with
- * the {@code extractInputFn} and combines them with the {@code keyedCombineFn},
- * and then it outputs each combined value with a {@link TupleTag} to a
- * {@link CoCombineResult}.
- */
- public <K, DataT, InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- KeyedCombineFn<K, InputT, ?, OutputT> keyedCombineFn,
- TupleTag<OutputT> outputTag) {
- return new ComposedKeyedCombineFn<DataT, K>()
- .with(extractInputFn, keyedCombineFn, outputTag);
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFnWithContext} that can take additional
- * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
- *
- * <p>The {@link ComposedKeyedCombineFnWithContext} extracts inputs from {@code DataT} with
- * the {@code extractInputFn} and combines them with the {@code keyedCombineFnWithContext},
- * and then it outputs each combined value with a {@link TupleTag} to a
- * {@link CoCombineResult}.
- */
- public <K, DataT, InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- KeyedCombineFnWithContext<K, InputT, ?, OutputT> keyedCombineFnWithContext,
- TupleTag<OutputT> outputTag) {
- return new ComposedKeyedCombineFnWithContext<DataT, K>()
- .with(extractInputFn, keyedCombineFnWithContext, outputTag);
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFn} that can take additional
- * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
- */
- public <K, DataT, InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- CombineFn<InputT, ?, OutputT> combineFn,
- TupleTag<OutputT> outputTag) {
- return with(extractInputFn, combineFn.<K>asKeyedFn(), outputTag);
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFnWithContext} that can take additional
- * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
- */
- public <K, DataT, InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- CombineFnWithContext<InputT, ?, OutputT> combineFnWithContext,
- TupleTag<OutputT> outputTag) {
- return with(extractInputFn, combineFnWithContext.<K>asKeyedFn(), outputTag);
- }
- }
-
- /**
* A builder class to construct a composed {@link GlobalCombineFn}.
*/
public static class ComposeCombineFnBuilder {
@@ -246,7 +139,7 @@ public class CombineFns {
/**
* A tuple of outputs produced by a composed combine functions.
*
- * <p>See {@link #compose()} or {@link #composeKeyed()}) for details.
+ * <p>See {@link #compose()} for details.
*/
public static class CoCombineResult implements Serializable {
@@ -598,345 +491,6 @@ public class CombineFns {
}
}
- /**
- * A composed {@link KeyedCombineFn} that applies multiple {@link KeyedCombineFn KeyedCombineFns}.
- *
- * <p>For each {@link KeyedCombineFn} it extracts inputs from {@code DataT} with
- * the {@code extractInputFn} and combines them,
- * and then it outputs each combined value with a {@link TupleTag} to a
- * {@link CoCombineResult}.
- */
- public static class ComposedKeyedCombineFn<DataT, K>
- extends KeyedCombineFn<K, DataT, Object[], CoCombineResult> {
-
- private final List<SerializableFunction<DataT, Object>> extractInputFns;
- private final List<KeyedCombineFn<K, Object, Object, Object>> keyedCombineFns;
- private final List<TupleTag<?>> outputTags;
- private final int combineFnCount;
-
- private ComposedKeyedCombineFn() {
- this.extractInputFns = ImmutableList.of();
- this.keyedCombineFns = ImmutableList.of();
- this.outputTags = ImmutableList.of();
- this.combineFnCount = 0;
- }
-
- private ComposedKeyedCombineFn(
- ImmutableList<SerializableFunction<DataT, ?>> extractInputFns,
- ImmutableList<KeyedCombineFn<K, ?, ?, ?>> keyedCombineFns,
- ImmutableList<TupleTag<?>> outputTags) {
- @SuppressWarnings({"unchecked", "rawtypes"})
- List<SerializableFunction<DataT, Object>> castedExtractInputFns = (List) extractInputFns;
- this.extractInputFns = castedExtractInputFns;
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- List<KeyedCombineFn<K, Object, Object, Object>> castedKeyedCombineFns =
- (List) keyedCombineFns;
- this.keyedCombineFns = castedKeyedCombineFns;
- this.outputTags = outputTags;
- this.combineFnCount = this.keyedCombineFns.size();
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFn} with an additional {@link KeyedCombineFn}.
- */
- public <InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- KeyedCombineFn<K, InputT, ?, OutputT> keyedCombineFn,
- TupleTag<OutputT> outputTag) {
- checkUniqueness(outputTags, outputTag);
- return new ComposedKeyedCombineFn<>(
- ImmutableList.<SerializableFunction<DataT, ?>>builder()
- .addAll(extractInputFns)
- .add(extractInputFn)
- .build(),
- ImmutableList.<KeyedCombineFn<K, ?, ?, ?>>builder()
- .addAll(keyedCombineFns)
- .add(keyedCombineFn)
- .build(),
- ImmutableList.<TupleTag<?>>builder()
- .addAll(outputTags)
- .add(outputTag)
- .build());
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
- * {@link KeyedCombineFnWithContext}.
- */
- public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- KeyedCombineFnWithContext<K, InputT, ?, OutputT> keyedCombineFn,
- TupleTag<OutputT> outputTag) {
- checkUniqueness(outputTags, outputTag);
- List<KeyedCombineFnWithContext<K, Object, Object, Object>> fnsWithContext =
- Lists.newArrayList();
- for (KeyedCombineFn<K, Object, Object, Object> fn : keyedCombineFns) {
- fnsWithContext.add(CombineFnUtil.toFnWithContext(fn));
- }
- return new ComposedKeyedCombineFnWithContext<>(
- ImmutableList.<SerializableFunction<DataT, ?>>builder()
- .addAll(extractInputFns)
- .add(extractInputFn)
- .build(),
- ImmutableList.<KeyedCombineFnWithContext<K, ?, ?, ?>>builder()
- .addAll(fnsWithContext)
- .add(keyedCombineFn)
- .build(),
- ImmutableList.<TupleTag<?>>builder()
- .addAll(outputTags)
- .add(outputTag)
- .build());
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFn} with an additional {@link CombineFn}.
- */
- public <InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- CombineFn<InputT, ?, OutputT> keyedCombineFn,
- TupleTag<OutputT> outputTag) {
- return with(extractInputFn, keyedCombineFn.<K>asKeyedFn(), outputTag);
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
- * {@link CombineFnWithContext}.
- */
- public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- CombineFnWithContext<InputT, ?, OutputT> keyedCombineFn,
- TupleTag<OutputT> outputTag) {
- return with(extractInputFn, keyedCombineFn.<K>asKeyedFn(), outputTag);
- }
-
- @Override
- public Object[] createAccumulator(K key) {
- Object[] accumsArray = new Object[combineFnCount];
- for (int i = 0; i < combineFnCount; ++i) {
- accumsArray[i] = keyedCombineFns.get(i).createAccumulator(key);
- }
- return accumsArray;
- }
-
- @Override
- public Object[] addInput(K key, Object[] accumulator, DataT value) {
- for (int i = 0; i < combineFnCount; ++i) {
- Object input = extractInputFns.get(i).apply(value);
- accumulator[i] = keyedCombineFns.get(i).addInput(key, accumulator[i], input);
- }
- return accumulator;
- }
-
- @Override
- public Object[] mergeAccumulators(K key, final Iterable<Object[]> accumulators) {
- Iterator<Object[]> iter = accumulators.iterator();
- if (!iter.hasNext()) {
- return createAccumulator(key);
- } else {
- // Reuses the first accumulator, and overwrites its values.
- // It is safe because {@code accum[i]} only depends on
- // the i-th component of each accumulator.
- Object[] accum = iter.next();
- for (int i = 0; i < combineFnCount; ++i) {
- accum[i] = keyedCombineFns.get(i).mergeAccumulators(
- key, new ProjectionIterable(accumulators, i));
- }
- return accum;
- }
- }
-
- @Override
- public CoCombineResult extractOutput(K key, Object[] accumulator) {
- Map<TupleTag<?>, Object> valuesMap = Maps.newHashMap();
- for (int i = 0; i < combineFnCount; ++i) {
- valuesMap.put(
- outputTags.get(i),
- keyedCombineFns.get(i).extractOutput(key, accumulator[i]));
- }
- return new CoCombineResult(valuesMap);
- }
-
- @Override
- public Object[] compact(K key, Object[] accumulator) {
- for (int i = 0; i < combineFnCount; ++i) {
- accumulator[i] = keyedCombineFns.get(i).compact(key, accumulator[i]);
- }
- return accumulator;
- }
-
- @Override
- public Coder<Object[]> getAccumulatorCoder(
- CoderRegistry registry, Coder<K> keyCoder, Coder<DataT> dataCoder)
- throws CannotProvideCoderException {
- List<Coder<Object>> coders = Lists.newArrayList();
- for (int i = 0; i < combineFnCount; ++i) {
- Coder<Object> inputCoder =
- registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
- coders.add(keyedCombineFns.get(i).getAccumulatorCoder(registry, keyCoder, inputCoder));
- }
- return new ComposedAccumulatorCoder(coders);
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- CombineFns.populateDisplayData(builder, keyedCombineFns);
- }
- }
-
- /**
- * A composed {@link KeyedCombineFnWithContext} that applies multiple
- * {@link KeyedCombineFnWithContext KeyedCombineFnWithContexts}.
- *
- * <p>For each {@link KeyedCombineFnWithContext} it extracts inputs from {@code DataT} with
- * the {@code extractInputFn} and combines them,
- * and then it outputs each combined value with a {@link TupleTag} to a
- * {@link CoCombineResult}.
- */
- public static class ComposedKeyedCombineFnWithContext<DataT, K>
- extends KeyedCombineFnWithContext<K, DataT, Object[], CoCombineResult> {
-
- private final List<SerializableFunction<DataT, Object>> extractInputFns;
- private final List<KeyedCombineFnWithContext<K, Object, Object, Object>> keyedCombineFns;
- private final List<TupleTag<?>> outputTags;
- private final int combineFnCount;
-
- private ComposedKeyedCombineFnWithContext() {
- this.extractInputFns = ImmutableList.of();
- this.keyedCombineFns = ImmutableList.of();
- this.outputTags = ImmutableList.of();
- this.combineFnCount = 0;
- }
-
- private ComposedKeyedCombineFnWithContext(
- ImmutableList<SerializableFunction<DataT, ?>> extractInputFns,
- ImmutableList<KeyedCombineFnWithContext<K, ?, ?, ?>> keyedCombineFns,
- ImmutableList<TupleTag<?>> outputTags) {
- @SuppressWarnings({"unchecked", "rawtypes"})
- List<SerializableFunction<DataT, Object>> castedExtractInputFns =
- (List) extractInputFns;
- this.extractInputFns = castedExtractInputFns;
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- List<KeyedCombineFnWithContext<K, Object, Object, Object>> castedKeyedCombineFns =
- (List) keyedCombineFns;
- this.keyedCombineFns = castedKeyedCombineFns;
- this.outputTags = outputTags;
- this.combineFnCount = this.keyedCombineFns.size();
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
- * {@link PerKeyCombineFn}.
- */
- public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- PerKeyCombineFn<K, InputT, ?, OutputT> perKeyCombineFn,
- TupleTag<OutputT> outputTag) {
- checkUniqueness(outputTags, outputTag);
- return new ComposedKeyedCombineFnWithContext<>(
- ImmutableList.<SerializableFunction<DataT, ?>>builder()
- .addAll(extractInputFns)
- .add(extractInputFn)
- .build(),
- ImmutableList.<KeyedCombineFnWithContext<K, ?, ?, ?>>builder()
- .addAll(keyedCombineFns)
- .add(CombineFnUtil.toFnWithContext(perKeyCombineFn))
- .build(),
- ImmutableList.<TupleTag<?>>builder()
- .addAll(outputTags)
- .add(outputTag)
- .build());
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
- * {@link GlobalCombineFn}.
- */
- public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- GlobalCombineFn<InputT, ?, OutputT> perKeyCombineFn,
- TupleTag<OutputT> outputTag) {
- return with(extractInputFn, perKeyCombineFn.<K>asKeyedFn(), outputTag);
- }
-
- @Override
- public Object[] createAccumulator(K key, Context c) {
- Object[] accumsArray = new Object[combineFnCount];
- for (int i = 0; i < combineFnCount; ++i) {
- accumsArray[i] = keyedCombineFns.get(i).createAccumulator(key, c);
- }
- return accumsArray;
- }
-
- @Override
- public Object[] addInput(K key, Object[] accumulator, DataT value, Context c) {
- for (int i = 0; i < combineFnCount; ++i) {
- Object input = extractInputFns.get(i).apply(value);
- accumulator[i] = keyedCombineFns.get(i).addInput(key, accumulator[i], input, c);
- }
- return accumulator;
- }
-
- @Override
- public Object[] mergeAccumulators(K key, Iterable<Object[]> accumulators, Context c) {
- Iterator<Object[]> iter = accumulators.iterator();
- if (!iter.hasNext()) {
- return createAccumulator(key, c);
- } else {
- // Reuses the first accumulator, and overwrites its values.
- // It is safe because {@code accum[i]} only depends on
- // the i-th component of each accumulator.
- Object[] accum = iter.next();
- for (int i = 0; i < combineFnCount; ++i) {
- accum[i] = keyedCombineFns.get(i).mergeAccumulators(
- key, new ProjectionIterable(accumulators, i), c);
- }
- return accum;
- }
- }
-
- @Override
- public CoCombineResult extractOutput(K key, Object[] accumulator, Context c) {
- Map<TupleTag<?>, Object> valuesMap = Maps.newHashMap();
- for (int i = 0; i < combineFnCount; ++i) {
- valuesMap.put(
- outputTags.get(i),
- keyedCombineFns.get(i).extractOutput(key, accumulator[i], c));
- }
- return new CoCombineResult(valuesMap);
- }
-
- @Override
- public Object[] compact(K key, Object[] accumulator, Context c) {
- for (int i = 0; i < combineFnCount; ++i) {
- accumulator[i] = keyedCombineFns.get(i).compact(key, accumulator[i], c);
- }
- return accumulator;
- }
-
- @Override
- public Coder<Object[]> getAccumulatorCoder(
- CoderRegistry registry, Coder<K> keyCoder, Coder<DataT> dataCoder)
- throws CannotProvideCoderException {
- List<Coder<Object>> coders = Lists.newArrayList();
- for (int i = 0; i < combineFnCount; ++i) {
- Coder<Object> inputCoder =
- registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
- coders.add(keyedCombineFns.get(i).getAccumulatorCoder(
- registry, keyCoder, inputCoder));
- }
- return new ComposedAccumulatorCoder(coders);
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- CombineFns.populateDisplayData(builder, keyedCombineFns);
- }
- }
-
/////////////////////////////////////////////////////////////////////////////
private static class ProjectionIterable implements Iterable<Object> {
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
index cd0600a..9ae19f8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
@@ -17,20 +17,15 @@
*/
package org.apache.beam.sdk.transforms;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollectionView;
/**
* This class contains combine functions that have access to {@code PipelineOptions} and side inputs
* through {@code CombineWithContext.Context}.
*
- * <p>{@link CombineFnWithContext} and {@link KeyedCombineFnWithContext} are for users to extend.
+ * <p>{@link CombineFnWithContext} is for users to extend.
*/
public class CombineWithContext {
@@ -116,170 +111,23 @@ public class CombineWithContext {
return accumulator;
}
- @Override
- public OutputT defaultValue() {
- throw new UnsupportedOperationException(
- "Override this function to provide the default value.");
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @Override
- public <K> KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> asKeyedFn() {
- // The key, an object, is never even looked at.
- return new KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
- @Override
- public AccumT createAccumulator(K key, Context c) {
- return CombineFnWithContext.this.createAccumulator(c);
- }
-
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT input, Context c) {
- return CombineFnWithContext.this.addInput(accumulator, input, c);
- }
-
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c) {
- return CombineFnWithContext.this.mergeAccumulators(accumulators, c);
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, Context c) {
- return CombineFnWithContext.this.extractOutput(accumulator, c);
- }
-
- @Override
- public AccumT compact(K key, AccumT accumulator, Context c) {
- return CombineFnWithContext.this.compact(accumulator, c);
- }
-
- @Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return CombineFnWithContext.this.getAccumulatorCoder(registry, inputCoder);
- }
-
- @Override
- public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return CombineFnWithContext.this.getDefaultOutputCoder(registry, inputCoder);
- }
-
- @Override
- public CombineFnWithContext<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) {
- return CombineFnWithContext.this;
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(CombineFnWithContext.this);
- }
- };
- }
- }
-
- /**
- * A keyed combine function that has access to {@code PipelineOptions} and side inputs through
- * {@code CombineWithContext.Context}.
- *
- * <p>See the equivalent {@link KeyedCombineFn} for details about keyed combine functions.
- */
- public abstract static class KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>
- extends CombineFnBase.AbstractPerKeyCombineFn<K, InputT, AccumT, OutputT>
- implements RequiresContextInternal {
- /**
- * Returns a new, mutable accumulator value representing the accumulation of zero input values.
- *
- * <p>It is equivalent to {@link KeyedCombineFn#createAccumulator},
- * but it has additional access to {@code CombineWithContext.Context}.
- */
- public abstract AccumT createAccumulator(K key, Context c);
-
/**
- * Adds the given input value to the given accumulator, returning the new accumulator value.
- *
- * <p>It is equivalent to {@link KeyedCombineFn#addInput}, but it has additional access to
- * {@code CombineWithContext.Context}.
+ * Applies this {@code CombineFnWithContext} to a collection of input values to produce a
+ * combined output value.
*/
- public abstract AccumT addInput(K key, AccumT accumulator, InputT value, Context c);
-
- /**
- * Returns an accumulator representing the accumulation of all the
- * input values accumulated in the merging accumulators.
- *
- * <p>It is equivalent to {@link KeyedCombineFn#mergeAccumulators},
- * but it has additional access to {@code CombineWithContext.Context}..
- */
- public abstract AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c);
-
- /**
- * Returns the output value that is the result of combining all
- * the input values represented by the given accumulator.
- *
- * <p>It is equivalent to {@link KeyedCombineFn#extractOutput}, but it has additional access to
- * {@code CombineWithContext.Context}.
- */
- public abstract OutputT extractOutput(K key, AccumT accumulator, Context c);
-
- /**
- * Returns an accumulator that represents the same logical value as the
- * input accumulator, but may have a more compact representation.
- *
- * <p>It is equivalent to {@link KeyedCombineFn#compact}, but it has additional access to
- * {@code CombineWithContext.Context}.
- */
- public AccumT compact(K key, AccumT accumulator, Context c) {
- return accumulator;
- }
-
- /**
- * Applies this {@code KeyedCombineFnWithContext} to a key and a collection
- * of input values to produce a combined output value.
- */
- public OutputT apply(K key, Iterable<? extends InputT> inputs, Context c) {
- AccumT accum = createAccumulator(key, c);
+ public OutputT apply(Iterable<? extends InputT> inputs, Context c) {
+ AccumT accum = createAccumulator(c);
for (InputT input : inputs) {
- accum = addInput(key, accum, input, c);
+ accum = addInput(accum, input, c);
}
- return extractOutput(key, accum, c);
+ return extractOutput(accum, c);
}
@Override
- public CombineFnWithContext<InputT, AccumT, OutputT> forKey(
- final K key, final Coder<K> keyCoder) {
- return new CombineFnWithContext<InputT, AccumT, OutputT>() {
- @Override
- public AccumT createAccumulator(Context c) {
- return KeyedCombineFnWithContext.this.createAccumulator(key, c);
- }
-
- @Override
- public AccumT addInput(AccumT accumulator, InputT input, Context c) {
- return KeyedCombineFnWithContext.this.addInput(key, accumulator, input, c);
- }
-
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) {
- return KeyedCombineFnWithContext.this.mergeAccumulators(key, accumulators, c);
- }
-
- @Override
- public OutputT extractOutput(AccumT accumulator, Context c) {
- return KeyedCombineFnWithContext.this.extractOutput(key, accumulator, c);
- }
-
- @Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
- throws CannotProvideCoderException {
- return KeyedCombineFnWithContext.this.getAccumulatorCoder(registry, keyCoder, inputCoder);
- }
-
- @Override
- public Coder<OutputT> getDefaultOutputCoder(
- CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return KeyedCombineFnWithContext.this.getDefaultOutputCoder(
- registry, keyCoder, inputCoder);
- }
- };
+ public OutputT defaultValue() {
+ throw new UnsupportedOperationException(
+ "Override this function to provide the default value.");
}
+
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index 47be9b9..e42c0b2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -234,7 +234,7 @@ public class Top {
public static <K, V, ComparatorT extends Comparator<V> & Serializable>
PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>>
perKey(int count, ComparatorT compareFn) {
- return Combine.perKey(new TopCombineFn<>(count, compareFn).<K>asKeyedFn());
+ return Combine.perKey(new TopCombineFn<>(count, compareFn));
}
/**
@@ -280,7 +280,7 @@ public class Top {
public static <K, V extends Comparable<V>>
PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>>
smallestPerKey(int count) {
- return Combine.perKey(new TopCombineFn<>(count, new Smallest<V>()).<K>asKeyedFn());
+ return Combine.perKey(new TopCombineFn<>(count, new Smallest<V>()));
}
/**
@@ -326,7 +326,7 @@ public class Top {
public static <K, V extends Comparable<V>>
PerKey<K, V, List<V>>
largestPerKey(int count) {
- return Combine.perKey(new TopCombineFn<>(count, new Largest<V>()).<K>asKeyedFn());
+ return Combine.perKey(new TopCombineFn<>(count, new Largest<V>()));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 0495ad6..b3b8918 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -205,7 +205,7 @@ public class View {
* PCollection<KV<K, V>> input = ...
* CombineFn<V, OutputT> yourCombineFn = ...
* PCollectionView<Map<K, OutputT>> output = input
- * .apply(Combine.perKey(yourCombineFn.<K>asKeyedFn()))
+ * .apply(Combine.perKey(yourCombineFn))
* .apply(View.<K, OutputT>asMap());
* }</pre>
*