You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/01/03 22:36:37 UTC
[2/7] beam git commit: [BEAM-1223] Reduced [Sum|Min|Max|Mean]*Fn
viability so as to make them an internal implementation detail and prevent
external code from employing them for tasks such as pipeline translation.
[BEAM-1223] Reduced [Sum|Min|Max|Mean]*Fn viability so as to make them an internal implementation detail and prevent external code from employing them for tasks such as pipeline translation.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/78a360ea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/78a360ea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/78a360ea
Branch: refs/heads/master
Commit: 78a360eac35507d9a558fc6117bb56b67b8a884e
Parents: e794f14
Author: Stas Levin <st...@gmail.com>
Authored: Sun Jan 1 13:58:32 2017 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 14:25:26 2017 -0800
----------------------------------------------------------------------
.../beam/examples/DebuggingWordCount.java | 4 +-
.../org/apache/beam/examples/WordCount.java | 2 +-
.../cookbook/CombinePerKeyExamples.java | 2 +-
.../beam/examples/complete/game/GameStats.java | 2 +-
.../beam/examples/complete/game/UserScore.java | 2 +-
.../runners/apex/examples/WordCountTest.java | 2 +-
.../utils/ApexStateInternalsTest.java | 2 +-
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 4 +-
.../runners/core/GroupAlsoByWindowsDoFn.java | 4 +-
.../apache/beam/runners/core/NonEmptyPanes.java | 2 +-
.../AfterDelayFromFirstElementStateMachine.java | 2 +-
.../core/triggers/AfterPaneStateMachine.java | 2 +-
.../core/LateDataDroppingDoFnRunnerTest.java | 2 +-
.../beam/runners/core/ReduceFnRunnerTest.java | 12 +-
.../beam/runners/core/ReduceFnTester.java | 2 +-
.../GroupAlsoByWindowEvaluatorFactory.java | 4 +-
.../runners/direct/AggregatorContainerTest.java | 16 +--
.../CopyOnAccessInMemoryStateInternalsTest.java | 4 +-
.../runners/direct/EvaluationContextTest.java | 6 +-
.../beam/runners/flink/examples/WordCount.java | 2 +-
.../flink/examples/streaming/AutoComplete.java | 2 +-
.../KafkaWindowedWordCountExample.java | 2 +-
.../examples/streaming/WindowedWordCount.java | 2 +-
.../streaming/FlinkStateInternalsTest.java | 2 +-
.../dataflow/DataflowPipelineJobTest.java | 8 +-
.../spark/aggregators/NamedAggregators.java | 4 +-
.../beam/runners/spark/examples/WordCount.java | 2 +-
.../translation/SparkGroupAlsoByWindowFn.java | 2 +-
.../spark/translation/SparkRuntimeContext.java | 63 ++++-------
.../ResumeFromCheckpointStreamingTest.java | 2 +-
.../streaming/utils/PAssertStreaming.java | 4 +-
.../apache/beam/sdk/io/PubsubUnboundedSink.java | 8 +-
.../beam/sdk/io/PubsubUnboundedSource.java | 5 +-
.../org/apache/beam/sdk/testing/PAssert.java | 12 +-
.../org/apache/beam/sdk/transforms/Max.java | 112 +++++++++++--------
.../org/apache/beam/sdk/transforms/Mean.java | 13 ++-
.../org/apache/beam/sdk/transforms/Min.java | 110 ++++++++++--------
.../org/apache/beam/sdk/transforms/Sum.java | 57 ++++++----
.../windowing/AfterDelayFromFirstElement.java | 2 +-
.../sdk/transforms/windowing/AfterPane.java | 2 +-
.../sdk/AggregatorPipelineExtractorTest.java | 16 +--
.../beam/sdk/transforms/CombineFnsTest.java | 20 ++--
.../apache/beam/sdk/transforms/DoFnTest.java | 15 ++-
.../beam/sdk/transforms/DoFnTesterTest.java | 6 +-
.../org/apache/beam/sdk/transforms/MaxTest.java | 6 +-
.../apache/beam/sdk/transforms/MeanTest.java | 2 +-
.../org/apache/beam/sdk/transforms/MinTest.java | 6 +-
.../beam/sdk/transforms/OldDoFnContextTest.java | 2 +-
.../apache/beam/sdk/transforms/OldDoFnTest.java | 11 +-
.../beam/sdk/transforms/SimpleStatsFnsTest.java | 36 +++---
.../org/apache/beam/sdk/transforms/SumTest.java | 12 +-
.../apache/beam/sdk/transforms/ViewTest.java | 2 +-
.../apache/beam/sdk/util/CombineFnUtilTest.java | 8 +-
.../util/state/InMemoryStateInternalsTest.java | 2 +-
.../beam/sdk/util/state/StateTagTest.java | 11 +-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 2 +-
56 files changed, 339 insertions(+), 310 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index f7c537c..997d590 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -95,9 +95,9 @@ public class DebuggingWordCount {
* in a dashboard, etc.
*/
private final Aggregator<Long, Long> matchedWords =
- createAggregator("matchedWords", new Sum.SumLongFn());
+ createAggregator("matchedWords", Sum.ofLongs());
private final Aggregator<Long, Long> unmatchedWords =
- createAggregator("umatchedWords", new Sum.SumLongFn());
+ createAggregator("umatchedWords", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index 224d7db..7e21d47 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -87,7 +87,7 @@ public class WordCount {
*/
static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
+ createAggregator("emptyLines", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 29655ea..37f9d79 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -80,7 +80,7 @@ public class CombinePerKeyExamples {
*/
static class ExtractLargeWordsFn extends DoFn<TableRow, KV<String, String>> {
private final Aggregator<Long, Long> smallerWords =
- createAggregator("smallerWords", new Sum.SumLongFn());
+ createAggregator("smallerWords", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c){
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 6ad6a23..74f1b30 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -127,7 +127,7 @@ public class GameStats extends LeaderBoard {
.withSideInputs(globalMeanScore)
.of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
private final Aggregator<Long, Long> numSpammerUsers =
- createAggregator("SpammerUsers", new Sum.SumLongFn());
+ createAggregator("SpammerUsers", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
Integer score = c.element().getValue();
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index cb81a7e..7dd5a8e 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -126,7 +126,7 @@ public class UserScore {
// Log and count parse errors.
private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class);
private final Aggregator<Long, Long> numParseErrors =
- createAggregator("ParseErrors", new Sum.SumLongFn());
+ createAggregator("ParseErrors", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index 28bb8ad..a1713ac 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -68,7 +68,7 @@ public class WordCountTest {
static class ExtractWordsFn extends DoFn<String, String> {
private static final long serialVersionUID = 1L;
private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
+ createAggregator("emptyLines", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 1801358..4d797f1 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -62,7 +62,7 @@ public class ApexStateInternalsTest {
StateTags.value("stringValue", StringUtf8Coder.of());
private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
- "sumInteger", VarIntCoder.of(), new Sum.SumIntegerFn());
+ "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 14171b3..d79683a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -50,9 +50,9 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
protected final Aggregator<Long, Long> droppedDueToClosedWindow =
createAggregator(
- GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
+ GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
protected final Aggregator<Long, Long> droppedDueToLateness =
- createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
+ createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
private final WindowingStrategy<Object, W> windowingStrategy;
private final StateInternalsFactory<K> stateInternalsFactory;
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
index 1b32d84..9a2f8fd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
@@ -41,7 +41,7 @@ public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends Bound
public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
protected final Aggregator<Long, Long> droppedDueToClosedWindow =
- createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
+ createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
protected final Aggregator<Long, Long> droppedDueToLateness =
- createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
+ createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
index 3e51dfb..0a6fd93 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
@@ -121,7 +121,7 @@ public abstract class NonEmptyPanes<K, W extends BoundedWindow> {
private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
PANE_ADDITIONS_TAG =
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
- "count", VarLongCoder.of(), new Sum.SumLongFn()));
+ "count", VarLongCoder.of(), Sum.ofLongs()));
@Override
public void recordContent(StateAccessor<K> state) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index c8922df..b60c690 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -56,7 +56,7 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
protected static final StateTag<Object, AccumulatorCombiningState<Instant,
Combine.Holder<Instant>, Instant>> DELAYED_UNTIL_TAG =
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
- "delayed", InstantCoder.of(), Min.MinFn.<Instant>naturalOrder()));
+ "delayed", InstantCoder.of(), Min.<Instant>naturalOrder()));
private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH);
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
index 38b95f9..d8ad370 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
@@ -38,7 +38,7 @@ public class AfterPaneStateMachine extends OnceTriggerStateMachine {
private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
ELEMENTS_IN_PANE_TAG =
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
- "count", VarLongCoder.of(), new Sum.SumLongFn()));
+ "count", VarLongCoder.of(), Sum.ofLongs()));
private final int countElems;
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
index 3cd5d4a..efe2044 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
@@ -114,7 +114,7 @@ public class LateDataDroppingDoFnRunnerTest {
@Override
public CombineFn<Long, ?, Long> getCombineFn() {
- return new Sum.SumLongFn();
+ return Sum.ofLongs();
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 4abfc9a..1bd717f 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -217,7 +217,7 @@ public class ReduceFnRunnerTest {
ReduceFnTester.combining(
strategy,
mockTriggerStateMachine,
- new Sum.SumIntegerFn().<String>asKeyedFn(),
+ Sum.ofIntegers().<String>asKeyedFn(),
VarIntCoder.of());
injectElement(tester, 2);
@@ -290,7 +290,7 @@ public class ReduceFnRunnerTest {
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
ReduceFnTester
- .combining(strategy, new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
+ .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
tester.injectElements(TimestampedValue.of(13, elementTimestamp));
@@ -322,7 +322,7 @@ public class ReduceFnRunnerTest {
ReduceFnTester.combining(
strategy,
mockTriggerStateMachine,
- new Sum.SumIntegerFn().<String>asKeyedFn(),
+ Sum.ofIntegers().<String>asKeyedFn(),
VarIntCoder.of());
injectElement(tester, 1);
@@ -1069,7 +1069,7 @@ public class ReduceFnRunnerTest {
SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30)))
.withTrigger(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.millis(1000)),
- new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
+ Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
tester.injectElements(
// assigned to [-60, 40), [-30, 70), [0, 100)
@@ -1212,7 +1212,7 @@ public class ReduceFnRunnerTest {
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
ReduceFnTester
- .combining(strategy, new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
+ .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
tester.advanceInputWatermark(new Instant(0));
tester.advanceProcessingTime(new Instant(0));
@@ -1268,7 +1268,7 @@ public class ReduceFnRunnerTest {
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
ReduceFnTester
- .combining(strategy, new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
+ .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
tester.advanceInputWatermark(new Instant(0));
tester.advanceProcessingTime(new Instant(0));
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 890195a..226f5f0 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -638,7 +638,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
@Override
public CombineFn<Long, ?, Long> getCombineFn() {
- return new Sum.SumLongFn();
+ return Sum.ofLongs();
}
public long getSum() {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index bb11923..2bc0d8d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -147,10 +147,10 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
reduceFn = SystemReduceFn.buffering(valueCoder);
droppedDueToClosedWindow = aggregatorChanges.createSystemAggregator(stepContext,
GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
- new Sum.SumLongFn());
+ Sum.ofLongs());
droppedDueToLateness = aggregatorChanges.createSystemAggregator(stepContext,
GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER,
- new Sum.SumLongFn());
+ Sum.ofLongs());
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
index f770800..37524eb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
@@ -26,7 +26,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.ExecutionContext.StepContext;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum.SumIntegerFn;
+import org.apache.beam.sdk.transforms.Sum;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -61,13 +61,13 @@ public class AggregatorContainerTest {
@Test
public void addsAggregatorsOnCommit() {
AggregatorContainer.Mutator mutator = container.createMutator();
- mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn()).addValue(5);
+ mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
mutator.commit();
assertThat((Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(5));
mutator = container.createMutator();
- mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn()).addValue(8);
+ mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(8);
assertThat("Shouldn't update value until commit",
(Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(5));
@@ -81,14 +81,14 @@ public class AggregatorContainerTest {
mutator.commit();
thrown.expect(IllegalStateException.class);
- mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn()).addValue(5);
+ mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
}
@Test
public void failToAddValueAfterCommit() {
AggregatorContainer.Mutator mutator = container.createMutator();
Aggregator<Integer, ?> aggregator =
- mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn());
+ mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers());
mutator.commit();
thrown.expect(IllegalStateException.class);
@@ -99,12 +99,12 @@ public class AggregatorContainerTest {
public void failToAddValueAfterCommitWithPrevious() {
AggregatorContainer.Mutator mutator = container.createMutator();
mutator.createAggregatorForDoFn(
- fn, stepContext, "sum_int", new SumIntegerFn()).addValue(5);
+ fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
mutator.commit();
mutator = container.createMutator();
Aggregator<Integer, ?> aggregator = mutator.createAggregatorForDoFn(
- fn, stepContext, "sum_int", new SumIntegerFn());
+ fn, stepContext, "sum_int", Sum.ofIntegers());
mutator.commit();
thrown.expect(IllegalStateException.class);
@@ -123,7 +123,7 @@ public class AggregatorContainerTest {
@Override
public void run() {
mutator.createAggregatorForDoFn(
- fn, stepContext, "sum_int", new SumIntegerFn()).addValue(value);
+ fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(value);
mutator.commit();
}
});
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 12ef66c..4284f3e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -167,7 +167,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
public void testAccumulatorCombiningStateWithUnderlying() throws CannotProvideCoderException {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- CombineFn<Long, long[], Long> sumLongFn = new Sum.SumLongFn();
+ CombineFn<Long, long[], Long> sumLongFn = Sum.ofLongs();
StateNamespace namespace = new StateNamespaceForTest("foo");
CoderRegistry reg = pipeline.getCoderRegistry();
@@ -197,7 +197,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
public void testKeyedAccumulatorCombiningStateWithUnderlying() throws Exception {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- KeyedCombineFn<String, Long, long[], Long> sumLongFn = new Sum.SumLongFn().asKeyedFn();
+ KeyedCombineFn<String, Long, long[], Long> sumLongFn = Sum.ofLongs().asKeyedFn();
StateNamespace namespace = new StateNamespaceForTest("foo");
CoderRegistry reg = pipeline.getCoderRegistry();
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 15340da..ad6e32d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -45,7 +45,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Sum.SumLongFn;
+import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -250,7 +250,7 @@ public class EvaluationContextTest {
"STEP", createdProducer.getTransform().getName());
AggregatorContainer container = context.getAggregatorContainer();
AggregatorContainer.Mutator mutator = container.createMutator();
- mutator.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(4L);
+ mutator.createAggregatorForDoFn(fn, stepContext, "foo", Sum.ofLongs()).addValue(4L);
TransformResult<?> result =
StepTransformResult.withoutHold(createdProducer)
@@ -260,7 +260,7 @@ public class EvaluationContextTest {
assertThat((Long) context.getAggregatorContainer().getAggregate("STEP", "foo"), equalTo(4L));
AggregatorContainer.Mutator mutatorAgain = container.createMutator();
- mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(12L);
+ mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", Sum.ofLongs()).addValue(12L);
TransformResult<?> secondResult =
StepTransformResult.withoutHold(downstreamProducer)
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index b6b3c1a..6ae4cf8 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -46,7 +46,7 @@ public class WordCount {
*/
public static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
+ createAggregator("emptyLines", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index 3405981..f33e616 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -314,7 +314,7 @@ public class AutoComplete {
static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
+ createAggregator("emptyLines", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index 42c42f3..ee0e874 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -55,7 +55,7 @@ public class KafkaWindowedWordCountExample {
*/
public static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
+ createAggregator("emptyLines", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
index 2246bdd..792c214 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
@@ -66,7 +66,7 @@ public class WindowedWordCount {
static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
+ createAggregator("emptyLines", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 628212a..126f611 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -70,7 +70,7 @@ public class FlinkStateInternalsTest {
StateTags.value("stringValue", StringUtf8Coder.of());
private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
- "sumInteger", VarIntCoder.of(), new Sum.SumIntegerFn());
+ "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 1890da1..6999e03 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -420,7 +420,7 @@ public class DataflowPipelineJobTest {
@Test
public void testGetAggregatorValuesWithSingleMetricUpdateReturnsSingletonCollection()
throws IOException, AggregatorRetrievalException {
- CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+ CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
String aggregatorName = "agg";
Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
@SuppressWarnings("unchecked")
@@ -468,7 +468,7 @@ public class DataflowPipelineJobTest {
@Test
public void testGetAggregatorValuesWithMultipleMetricUpdatesReturnsCollection()
throws IOException, AggregatorRetrievalException {
- CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+ CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
String aggregatorName = "agg";
Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
@@ -536,7 +536,7 @@ public class DataflowPipelineJobTest {
@Test
public void testGetAggregatorValuesWithUnrelatedMetricUpdateIgnoresUpdate()
throws IOException, AggregatorRetrievalException {
- CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+ CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
String aggregatorName = "agg";
Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
@SuppressWarnings("unchecked")
@@ -600,7 +600,7 @@ public class DataflowPipelineJobTest {
@Test
public void testGetAggregatorValuesWhenClientThrowsExceptionThrowsAggregatorRetrievalException()
throws IOException, AggregatorRetrievalException {
- CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+ CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
String aggregatorName = "agg";
Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index 4e96466..52fe994 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -153,7 +153,9 @@ public class NamedAggregators implements Serializable {
}
/**
- * => combineFunction in data flow.
+ * @param <InputT> Input data type
+ * @param <InterT> Intermediate data type (useful for averages)
+ * @param <OutputT> Output data type
*/
public static class CombineFunctionState<InputT, InterT, OutputT>
implements State<InputT, InterT, OutputT> {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index 1252d12..da14ee2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -46,7 +46,7 @@ public class WordCount {
*/
static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
+ createAggregator("emptyLines", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
index 5432d58..b615132 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
@@ -78,7 +78,7 @@ public class SparkGroupAlsoByWindowFn<K, InputT, W extends BoundedWindow>
droppedDueToClosedWindow = runtimeContext.createAggregator(
accumulator,
GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
- new Sum.SumLongFn());
+ Sum.ofLongs());
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 01b6b54..9c3d79f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -32,10 +32,6 @@ import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.spark.Accumulator;
/**
@@ -92,18 +88,26 @@ public class SparkRuntimeContext implements Serializable {
Combine.CombineFn<? super InputT, InterT, OutputT> combineFn) {
@SuppressWarnings("unchecked")
Aggregator<InputT, OutputT> aggregator = (Aggregator<InputT, OutputT>) aggregators.get(named);
- if (aggregator == null) {
- @SuppressWarnings("unchecked")
- NamedAggregators.CombineFunctionState<InputT, InterT, OutputT> state =
- new NamedAggregators.CombineFunctionState<>(
- (Combine.CombineFn<InputT, InterT, OutputT>) combineFn,
- (Coder<InputT>) getCoder(combineFn),
- this);
- accum.add(new NamedAggregators(named, state));
- aggregator = new SparkAggregator<>(named, state);
- aggregators.put(named, aggregator);
+ try {
+ if (aggregator == null) {
+ @SuppressWarnings("unchecked")
+ final
+ NamedAggregators.CombineFunctionState<InputT, InterT, OutputT> state =
+ new NamedAggregators.CombineFunctionState<>(
+ (Combine.CombineFn<InputT, InterT, OutputT>) combineFn,
+ // hidden assumption: InputT == OutputT
+ (Coder<InputT>) getCoderRegistry().getCoder(combineFn.getOutputType()),
+ this);
+
+ accum.add(new NamedAggregators(named, state));
+ aggregator = new SparkAggregator<>(named, state);
+ aggregators.put(named, aggregator);
+ }
+ return aggregator;
+ } catch (CannotProvideCoderException e) {
+ throw new RuntimeException(String.format("Unable to create an aggregator named: [%s]", named),
+ e);
}
- return aggregator;
}
public CoderRegistry getCoderRegistry() {
@@ -114,35 +118,6 @@ public class SparkRuntimeContext implements Serializable {
return coderRegistry;
}
- private Coder<?> getCoder(Combine.CombineFn<?, ?, ?> combiner) {
- try {
- if (combiner.getClass() == Sum.SumIntegerFn.class) {
- return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
- } else if (combiner.getClass() == Sum.SumLongFn.class) {
- return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
- } else if (combiner.getClass() == Sum.SumDoubleFn.class) {
- return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
- } else if (combiner.getClass() == Min.MinIntegerFn.class) {
- return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
- } else if (combiner.getClass() == Min.MinLongFn.class) {
- return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
- } else if (combiner.getClass() == Min.MinDoubleFn.class) {
- return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
- } else if (combiner.getClass() == Max.MaxIntegerFn.class) {
- return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
- } else if (combiner.getClass() == Max.MaxLongFn.class) {
- return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
- } else if (combiner.getClass() == Max.MaxDoubleFn.class) {
- return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
- } else {
- throw new IllegalArgumentException("unsupported combiner in Aggregator: "
- + combiner.getClass().getName());
- }
- } catch (CannotProvideCoderException e) {
- throw new IllegalStateException("Could not determine default coder for combiner", e);
- }
- }
-
/**
* Initialize spark aggregators exactly once.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 2718b5f..ab04c5c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -182,7 +182,7 @@ public class ResumeFromCheckpointStreamingTest {
private static class FormatAsText extends DoFn<KV<String, String>, String> {
private final Aggregator<Long, Long> aggregator =
- createAggregator("processedMessages", new Sum.SumLongFn());
+ createAggregator("processedMessages", Sum.ofLongs());
@ProcessElement
public void process(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
index 0284b3d..cd9de92 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
@@ -97,9 +97,9 @@ public final class PAssertStreaming implements Serializable {
private static class AssertDoFn<T> extends DoFn<Iterable<T>, Void> {
private final Aggregator<Integer, Integer> success =
- createAggregator(PAssert.SUCCESS_COUNTER, new Sum.SumIntegerFn());
+ createAggregator(PAssert.SUCCESS_COUNTER, Sum.ofIntegers());
private final Aggregator<Integer, Integer> failure =
- createAggregator(PAssert.FAILURE_COUNTER, new Sum.SumIntegerFn());
+ createAggregator(PAssert.FAILURE_COUNTER, Sum.ofIntegers());
private final T[] expected;
AssertDoFn(T[] expected) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 58414c6..75f6b7d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -154,7 +154,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
*/
private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>> {
private final Aggregator<Long, Long> elementCounter =
- createAggregator("elements", new Sum.SumLongFn());
+ createAggregator("elements", Sum.ofLongs());
private final Coder<T> elementCoder;
private final int numShards;
private final RecordIdMethod recordIdMethod;
@@ -219,11 +219,11 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
private transient PubsubClient pubsubClient;
private final Aggregator<Long, Long> batchCounter =
- createAggregator("batches", new Sum.SumLongFn());
+ createAggregator("batches", Sum.ofLongs());
private final Aggregator<Long, Long> elementCounter =
- createAggregator("elements", new Sum.SumLongFn());
+ createAggregator("elements", Sum.ofLongs());
private final Aggregator<Long, Long> byteCounter =
- createAggregator("bytes", new Sum.SumLongFn());
+ createAggregator("bytes", Sum.ofLongs());
WriterFn(
PubsubClientFactory pubsubFactory, ValueProvider<TopicPath> topic,
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index da3b437..4b3792d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -59,7 +59,6 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.Sum.SumLongFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -216,7 +215,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
}
};
- private static final Combine.BinaryCombineLongFn SUM = new SumLongFn();
+ private static final Combine.BinaryCombineLongFn SUM = Sum.ofLongs();
// ================================================================================
// Checkpoint
@@ -1159,7 +1158,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
private static class StatsFn<T> extends DoFn<T, T> {
private final Aggregator<Long, Long> elementCounter =
- createAggregator("elements", new Sum.SumLongFn());
+ createAggregator("elements", Sum.ofLongs());
private final PubsubClientFactory pubsubFactory;
private final ValueProvider<SubscriptionPath> subscription;
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index b23f4f3..b57f4a9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -1019,9 +1019,9 @@ public class PAssert {
private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> {
private final SerializableFunction<ActualT, Void> checkerFn;
private final Aggregator<Integer, Integer> success =
- createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+ createAggregator(SUCCESS_COUNTER, Sum.ofIntegers());
private final Aggregator<Integer, Integer> failure =
- createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+ createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
private final PCollectionView<ActualT> actual;
private SideInputCheckerDoFn(
@@ -1054,9 +1054,9 @@ public class PAssert {
private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> {
private final SerializableFunction<ActualT, Void> checkerFn;
private final Aggregator<Integer, Integer> success =
- createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+ createAggregator(SUCCESS_COUNTER, Sum.ofIntegers());
private final Aggregator<Integer, Integer> failure =
- createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+ createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
private GroupedValuesCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
this.checkerFn = checkerFn;
@@ -1079,9 +1079,9 @@ public class PAssert {
private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> {
private final SerializableFunction<ActualT, Void> checkerFn;
private final Aggregator<Integer, Integer> success =
- createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+ createAggregator(SUCCESS_COUNTER, Sum.ofIntegers());
private final Aggregator<Integer, Integer> failure =
- createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+ createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
private SingletonCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
this.checkerFn = checkerFn;
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index 0990ca4..696bed9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -110,13 +110,69 @@ public class Max {
}
/**
+ * A {@code CombineFn} that computes the maximum of a collection of {@code Integer}s, useful as an
+ * argument to {@link Combine#globally} or {@link Combine#perKey}.
+ */
+ public static Combine.BinaryCombineIntegerFn ofIntegers() {
+ return new Max.MaxIntegerFn();
+ }
+
+ /**
+ * A {@code CombineFn} that computes the maximum of a collection of {@code Long}s, useful as an
+ * argument to {@link Combine#globally} or {@link Combine#perKey}.
+ */
+ public static Combine.BinaryCombineLongFn ofLongs() {
+ return new Max.MaxLongFn();
+ }
+
+ /**
+ * A {@code CombineFn} that computes the maximum of a collection of {@code Double}s, useful as an
+ * argument to {@link Combine#globally} or {@link Combine#perKey}.
+ */
+ public static Combine.BinaryCombineDoubleFn ofDoubles() {
+ return new Max.MaxDoubleFn();
+ }
+
+ /**
+ * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+ * using an arbitrary {@link Comparator} and {@code identity},
+ * useful as an argument to {@link Combine#globally} or {@link Combine#perKey}.
+ *
+ * @param <T> the type of the values being compared
+ */
+ public static <T, ComparatorT extends Comparator<? super T> & Serializable>
+ BinaryCombineFn<T> of(final T identity, final ComparatorT comparator) {
+ return new MaxFn<T>(identity, comparator);
+ }
+
+ /**
+ * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+ * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
+ * {@link Combine#perKey}.
+ *
+ * @param <T> the type of the values being compared
+ */
+ public static <T, ComparatorT extends Comparator<? super T> & Serializable>
+ BinaryCombineFn<T> of(final ComparatorT comparator) {
+ return new MaxFn<T>(null, comparator);
+ }
+
+ public static <T extends Comparable<? super T>> BinaryCombineFn<T> naturalOrder(T identity) {
+ return new MaxFn<T>(identity, new Top.Largest<T>());
+ }
+
+ public static <T extends Comparable<? super T>> BinaryCombineFn<T> naturalOrder() {
+ return new MaxFn<T>(null, new Top.Largest<T>());
+ }
+
+ /**
* Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
* PCollection<T>} whose contents is the maximum according to the natural ordering of {@code T}
* of the input {@code PCollection}'s elements, or {@code null} if there are no elements.
*/
public static <T extends Comparable<? super T>>
Combine.Globally<T, T> globally() {
- return Combine.<T, T>globally(MaxFn.<T>naturalOrder());
+ return Combine.<T, T>globally(Max.<T>naturalOrder());
}
/**
@@ -129,7 +185,7 @@ public class Max {
*/
public static <K, T extends Comparable<? super T>>
Combine.PerKey<K, T, T> perKey() {
- return Combine.<K, T, T>perKey(MaxFn.<T>naturalOrder());
+ return Combine.<K, T, T>perKey(Max.<T>naturalOrder());
}
/**
@@ -139,7 +195,7 @@ public class Max {
*/
public static <T, ComparatorT extends Comparator<? super T> & Serializable>
Combine.Globally<T, T> globally(ComparatorT comparator) {
- return Combine.<T, T>globally(MaxFn.of(comparator));
+ return Combine.<T, T>globally(Max.of(comparator));
}
/**
@@ -151,19 +207,12 @@ public class Max {
*/
public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
- return Combine.<K, T, T>perKey(MaxFn.of(comparator));
+ return Combine.<K, T, T>perKey(Max.of(comparator));
}
/////////////////////////////////////////////////////////////////////////////
- /**
- * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
- * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
- * {@link Combine#perKey}.
- *
- * @param <T> the type of the values being compared
- */
- public static class MaxFn<T> extends BinaryCombineFn<T> {
+ private static class MaxFn<T> extends BinaryCombineFn<T> {
private final T identity;
private final Comparator<? super T> comparator;
@@ -174,24 +223,6 @@ public class Max {
this.comparator = comparator;
}
- public static <T, ComparatorT extends Comparator<? super T> & Serializable>
- MaxFn<T> of(T identity, ComparatorT comparator) {
- return new MaxFn<T>(identity, comparator);
- }
-
- public static <T, ComparatorT extends Comparator<? super T> & Serializable>
- MaxFn<T> of(ComparatorT comparator) {
- return new MaxFn<T>(null, comparator);
- }
-
- public static <T extends Comparable<? super T>> MaxFn<T> naturalOrder(T identity) {
- return new MaxFn<T>(identity, new Top.Largest<T>());
- }
-
- public static <T extends Comparable<? super T>> MaxFn<T> naturalOrder() {
- return new MaxFn<T>(null, new Top.Largest<T>());
- }
-
@Override
public T identity() {
return identity;
@@ -210,11 +241,8 @@ public class Max {
}
}
- /**
- * A {@code CombineFn} that computes the maximum of a collection of {@code Integer}s, useful as an
- * argument to {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn {
+ private static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn {
+
@Override
public int apply(int left, int right) {
return left >= right ? left : right;
@@ -226,11 +254,8 @@ public class Max {
}
}
- /**
- * A {@code CombineFn} that computes the maximum of a collection of {@code Long}s, useful as an
- * argument to {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class MaxLongFn extends Combine.BinaryCombineLongFn {
+ private static class MaxLongFn extends Combine.BinaryCombineLongFn {
+
@Override
public long apply(long left, long right) {
return left >= right ? left : right;
@@ -242,11 +267,8 @@ public class Max {
}
}
- /**
- * A {@code CombineFn} that computes the maximum of a collection of {@code Double}s, useful as an
- * argument to {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn {
+ private static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn {
+
@Override
public double apply(double left, double right) {
return left >= right ? left : right;
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index cb77ba3..7e62e9d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -84,9 +84,6 @@ public class Mean {
return Combine.<K, NumT, Double>perKey(new MeanFn<>());
}
-
- /////////////////////////////////////////////////////////////////////////////
-
/**
* A {@code Combine.CombineFn} that computes the arithmetic mean
* (a.k.a. average) of an {@code Iterable} of numbers of type
@@ -97,13 +94,19 @@ public class Mean {
*
* @param <NumT> the type of the {@code Number}s being combined
*/
- static class MeanFn<NumT extends Number>
+ public static <NumT extends Number>
+ Combine.AccumulatingCombineFn<NumT, CountSum<NumT>, Double> of() {
+ return new MeanFn<>();
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ private static class MeanFn<NumT extends Number>
extends Combine.AccumulatingCombineFn<NumT, CountSum<NumT>, Double> {
/**
* Constructs a combining function that computes the mean over
* a collection of values of type {@code N}.
*/
- public MeanFn() {}
@Override
public CountSum<NumT> createAccumulator() {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index 5003594..b208929 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -110,13 +110,69 @@ public class Min {
}
/**
+ * A {@code CombineFn} that computes the minimum of a collection of {@code Integer}s, useful as an
+ * argument to {@link Combine#globally} or {@link Combine#perKey}.
+ */
+ public static Combine.BinaryCombineIntegerFn ofIntegers() {
+ return new Min.MinIntegerFn();
+ }
+
+ /**
+ * A {@code CombineFn} that computes the minimum of a collection of {@code Long}s, useful as an
+ * argument to {@link Combine#globally} or {@link Combine#perKey}.
+ */
+ public static Combine.BinaryCombineLongFn ofLongs() {
+ return new Min.MinLongFn();
+ }
+
+ /**
+ * A {@code CombineFn} that computes the minimum of a collection of {@code Double}s, useful as an
+ * argument to {@link Combine#globally} or {@link Combine#perKey}.
+ */
+ public static Combine.BinaryCombineDoubleFn ofDoubles() {
+ return new Min.MinDoubleFn();
+ }
+
+ /**
+ * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+ * using an arbitrary {@link Comparator} and an {@code identity},
+ * useful as an argument to {@link Combine#globally} or {@link Combine#perKey}.
+ *
+ * @param <T> the type of the values being compared
+ */
+ public static <T, ComparatorT extends Comparator<? super T> & Serializable>
+ BinaryCombineFn<T> of(T identity, ComparatorT comparator) {
+ return new MinFn<T>(identity, comparator);
+ }
+
+ /**
+ * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+ * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
+ * {@link Combine#perKey}.
+ *
+ * @param <T> the type of the values being compared
+ */
+ public static <T, ComparatorT extends Comparator<? super T> & Serializable>
+ BinaryCombineFn<T> of(ComparatorT comparator) {
+ return new MinFn<T>(null, comparator);
+ }
+
+ public static <T extends Comparable<? super T>> BinaryCombineFn<T> naturalOrder(T identity) {
+ return new MinFn<T>(identity, new Top.Largest<T>());
+ }
+
+ public static <T extends Comparable<? super T>> BinaryCombineFn<T> naturalOrder() {
+ return new MinFn<T>(null, new Top.Largest<T>());
+ }
+
+ /**
* Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
* PCollection<T>} whose contents is the minimum according to the natural ordering of {@code T}
* of the input {@code PCollection}'s elements, or {@code null} if there are no elements.
*/
public static <T extends Comparable<? super T>>
Combine.Globally<T, T> globally() {
- return Combine.<T, T>globally(MinFn.<T>naturalOrder());
+ return Combine.<T, T>globally(Min.<T>naturalOrder());
}
/**
@@ -129,7 +185,7 @@ public class Min {
*/
public static <K, T extends Comparable<? super T>>
Combine.PerKey<K, T, T> perKey() {
- return Combine.<K, T, T>perKey(MinFn.<T>naturalOrder());
+ return Combine.<K, T, T>perKey(Min.<T>naturalOrder());
}
/**
@@ -139,7 +195,7 @@ public class Min {
*/
public static <T, ComparatorT extends Comparator<? super T> & Serializable>
Combine.Globally<T, T> globally(ComparatorT comparator) {
- return Combine.<T, T>globally(MinFn.of(comparator));
+ return Combine.<T, T>globally(Min.of(comparator));
}
/**
@@ -151,19 +207,12 @@ public class Min {
*/
public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
- return Combine.<K, T, T>perKey(MinFn.of(comparator));
+ return Combine.<K, T, T>perKey(Min.of(comparator));
}
/////////////////////////////////////////////////////////////////////////////
- /**
- * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
- * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
- * {@link Combine#perKey}.
- *
- * @param <T> the type of the values being compared
- */
- public static class MinFn<T> extends BinaryCombineFn<T> {
+ private static class MinFn<T> extends BinaryCombineFn<T> {
private final T identity;
private final Comparator<? super T> comparator;
@@ -174,24 +223,6 @@ public class Min {
this.comparator = comparator;
}
- public static <T, ComparatorT extends Comparator<? super T> & Serializable>
- MinFn<T> of(T identity, ComparatorT comparator) {
- return new MinFn<T>(identity, comparator);
- }
-
- public static <T, ComparatorT extends Comparator<? super T> & Serializable>
- MinFn<T> of(ComparatorT comparator) {
- return new MinFn<T>(null, comparator);
- }
-
- public static <T extends Comparable<? super T>> MinFn<T> naturalOrder(T identity) {
- return new MinFn<T>(identity, new Top.Largest<T>());
- }
-
- public static <T extends Comparable<? super T>> MinFn<T> naturalOrder() {
- return new MinFn<T>(null, new Top.Largest<T>());
- }
-
@Override
public T identity() {
return identity;
@@ -210,11 +241,7 @@ public class Min {
}
}
- /**
- * A {@code CombineFn} that computes the minimum of a collection of {@code Integer}s, useful as an
- * argument to {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class MinIntegerFn extends Combine.BinaryCombineIntegerFn {
+ private static class MinIntegerFn extends Combine.BinaryCombineIntegerFn {
@Override
public int apply(int left, int right) {
@@ -227,11 +254,8 @@ public class Min {
}
}
- /**
- * A {@code CombineFn} that computes the minimum of a collection of {@code Long}s, useful as an
- * argument to {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class MinLongFn extends Combine.BinaryCombineLongFn {
+ private static class MinLongFn extends Combine.BinaryCombineLongFn {
+
@Override
public long apply(long left, long right) {
return left <= right ? left : right;
@@ -243,11 +267,7 @@ public class Min {
}
}
- /**
- * A {@code CombineFn} that computes the minimum of a collection of {@code Double}s, useful as an
- * argument to {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class MinDoubleFn extends Combine.BinaryCombineDoubleFn {
+ private static class MinDoubleFn extends Combine.BinaryCombineDoubleFn {
@Override
public double apply(double left, double right) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
index 48eafc3..ccade4d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
@@ -50,7 +50,7 @@ public class Sum {
* {@code 0} if there are no elements.
*/
public static Combine.Globally<Integer, Integer> integersGlobally() {
- return Combine.globally(new SumIntegerFn());
+ return Combine.globally(Sum.ofIntegers());
}
/**
@@ -62,7 +62,7 @@ public class Sum {
* that key in the input {@code PCollection}.
*/
public static <K> Combine.PerKey<K, Integer, Integer> integersPerKey() {
- return Combine.<K, Integer, Integer>perKey(new SumIntegerFn());
+ return Combine.<K, Integer, Integer>perKey(Sum.ofIntegers());
}
/**
@@ -73,7 +73,7 @@ public class Sum {
* {@code 0} if there are no elements.
*/
public static Combine.Globally<Long, Long> longsGlobally() {
- return Combine.globally(new SumLongFn());
+ return Combine.globally(Sum.ofLongs());
}
/**
@@ -85,7 +85,7 @@ public class Sum {
* that key in the input {@code PCollection}.
*/
public static <K> Combine.PerKey<K, Long, Long> longsPerKey() {
- return Combine.<K, Long, Long>perKey(new SumLongFn());
+ return Combine.<K, Long, Long>perKey(Sum.ofLongs());
}
/**
@@ -96,7 +96,7 @@ public class Sum {
* {@code 0} if there are no elements.
*/
public static Combine.Globally<Double, Double> doublesGlobally() {
- return Combine.globally(new SumDoubleFn());
+ return Combine.globally(Sum.ofDoubles());
}
/**
@@ -108,18 +108,40 @@ public class Sum {
* that key in the input {@code PCollection}.
*/
public static <K> Combine.PerKey<K, Double, Double> doublesPerKey() {
- return Combine.<K, Double, Double>perKey(new SumDoubleFn());
+ return Combine.<K, Double, Double>perKey(Sum.ofDoubles());
}
+ /**
+ * A {@code SerializableFunction} that computes the sum of an
+ * {@code Iterable} of {@code Integer}s, useful as an argument to
+ * {@link Combine#globally} or {@link Combine#perKey}.
+ */
+ public static Combine.BinaryCombineIntegerFn ofIntegers() {
+ return new SumIntegerFn();
+ }
- /////////////////////////////////////////////////////////////////////////////
+ /**
+ * A {@code SerializableFunction} that computes the sum of an
+ * {@code Iterable} of {@code Double}s, useful as an argument to
+ * {@link Combine#globally} or {@link Combine#perKey}.
+ */
+ public static Combine.BinaryCombineDoubleFn ofDoubles() {
+ return new SumDoubleFn();
+ }
/**
* A {@code SerializableFunction} that computes the sum of an
- * {@code Iterable} of {@code Integer}s, useful as an argument to
+ * {@code Iterable} of {@code Long}s, useful as an argument to
* {@link Combine#globally} or {@link Combine#perKey}.
*/
- public static class SumIntegerFn extends Combine.BinaryCombineIntegerFn {
+ public static Combine.BinaryCombineLongFn ofLongs() {
+ return new SumLongFn();
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ private static class SumIntegerFn extends Combine.BinaryCombineIntegerFn {
+
@Override
public int apply(int a, int b) {
return a + b;
@@ -131,13 +153,8 @@ public class Sum {
}
}
- /**
- * A {@code SerializableFunction} that computes the sum of an
- * {@code Iterable} of {@code Long}s, useful as an argument to
- * {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class SumLongFn
- extends Combine.BinaryCombineLongFn {
+ private static class SumLongFn extends Combine.BinaryCombineLongFn {
+
@Override
public long apply(long a, long b) {
return a + b;
@@ -149,12 +166,8 @@ public class Sum {
}
}
- /**
- * A {@code SerializableFunction} that computes the sum of an
- * {@code Iterable} of {@code Double}s, useful as an argument to
- * {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class SumDoubleFn extends Combine.BinaryCombineDoubleFn {
+ private static class SumDoubleFn extends Combine.BinaryCombineDoubleFn {
+
@Override
public double apply(double a, double b) {
return a + b;
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
index 9daecb2..6392fb5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
@@ -51,7 +51,7 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger {
protected static final StateTag<Object, AccumulatorCombiningState<Instant,
Combine.Holder<Instant>, Instant>> DELAYED_UNTIL_TAG =
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
- "delayed", InstantCoder.of(), Min.MinFn.<Instant>naturalOrder()));
+ "delayed", InstantCoder.of(), Min.<Instant>naturalOrder()));
private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH);
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
index 4a706e6..d66e1c4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
@@ -37,7 +37,7 @@ public class AfterPane extends OnceTrigger {
private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
ELEMENTS_IN_PANE_TAG =
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
- "count", VarLongCoder.of(), new Sum.SumLongFn()));
+ "count", VarLongCoder.of(), Sum.ofLongs()));
private final int countElems;
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
index 1bf2c3d..22efd85 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
@@ -70,8 +70,8 @@ public class AggregatorPipelineExtractorTest {
AggregatorProvidingDoFn<ThreadGroup, StrictMath> fn = new AggregatorProvidingDoFn<>();
when(bound.getFn()).thenReturn(fn);
- Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
- Aggregator<Integer, Integer> aggregatorTwo = fn.addAggregator(new Min.MinIntegerFn());
+ Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Sum.ofLongs());
+ Aggregator<Integer, Integer> aggregatorTwo = fn.addAggregator(Min.ofIntegers());
TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
when(transformNode.getTransform()).thenReturn(bound);
@@ -98,8 +98,8 @@ public class AggregatorPipelineExtractorTest {
AggregatorProvidingDoFn<Object, Void> fn = new AggregatorProvidingDoFn<>();
when(bound.getFn()).thenReturn(fn);
- Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Max.MaxLongFn());
- Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
+ Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Max.ofLongs());
+ Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(Min.ofDoubles());
TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
when(transformNode.getTransform()).thenReturn(bound);
@@ -129,8 +129,8 @@ public class AggregatorPipelineExtractorTest {
when(bound.getFn()).thenReturn(fn);
when(otherBound.getFn()).thenReturn(fn);
- Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
- Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
+ Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Sum.ofLongs());
+ Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(Min.ofDoubles());
TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
when(transformNode.getTransform()).thenReturn(bound);
@@ -160,7 +160,7 @@ public class AggregatorPipelineExtractorTest {
ParDo.Bound bound = mock(ParDo.Bound.class, "Bound");
AggregatorProvidingDoFn<ThreadGroup, Void> fn = new AggregatorProvidingDoFn<>();
- Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
+ Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Sum.ofLongs());
when(bound.getFn()).thenReturn(fn);
@@ -168,7 +168,7 @@ public class AggregatorPipelineExtractorTest {
ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound");
AggregatorProvidingDoFn<Long, Long> otherFn = new AggregatorProvidingDoFn<>();
- Aggregator<Double, Double> aggregatorTwo = otherFn.addAggregator(new Sum.SumDoubleFn());
+ Aggregator<Double, Double> aggregatorTwo = otherFn.addAggregator(Sum.ofDoubles());
when(otherBound.getFn()).thenReturn(otherFn);
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index cdd4707..4d35e53 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -41,8 +41,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
import org.apache.beam.sdk.transforms.CombineFns.CoCombineResult;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
-import org.apache.beam.sdk.transforms.Min.MinIntegerFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -71,8 +69,8 @@ public class CombineFnsTest {
TupleTag<Integer> tag = new TupleTag<Integer>();
CombineFns.compose()
- .with(new GetIntegerFunction(), new MaxIntegerFn(), tag)
- .with(new GetIntegerFunction(), new MinIntegerFn(), tag);
+ .with(new GetIntegerFunction(), Max.ofIntegers(), tag)
+ .with(new GetIntegerFunction(), Min.ofIntegers(), tag);
}
@Test
@@ -82,8 +80,8 @@ public class CombineFnsTest {
TupleTag<Integer> tag = new TupleTag<Integer>();
CombineFns.composeKeyed()
- .with(new GetIntegerFunction(), new MaxIntegerFn(), tag)
- .with(new GetIntegerFunction(), new MinIntegerFn(), tag);
+ .with(new GetIntegerFunction(), Max.ofIntegers(), tag)
+ .with(new GetIntegerFunction(), Min.ofIntegers(), tag);
}
@Test
@@ -145,7 +143,7 @@ public class CombineFnsTest {
.apply(Combine.globally(CombineFns.compose()
.with(
new GetIntegerFunction(),
- new MaxIntegerFn(),
+ Max.ofIntegers(),
maxIntTag)
.with(
new GetUserStringFunction(),
@@ -159,7 +157,7 @@ public class CombineFnsTest {
.apply(Combine.perKey(CombineFns.composeKeyed()
.with(
new GetIntegerFunction(),
- new MaxIntegerFn().<String>asKeyedFn(),
+ Max.ofIntegers().<String>asKeyedFn(),
maxIntTag)
.with(
new GetUserStringFunction(),
@@ -203,7 +201,7 @@ public class CombineFnsTest {
.apply(Combine.globally(CombineFns.compose()
.with(
new GetIntegerFunction(),
- new MaxIntegerFn(),
+ Max.ofIntegers(),
maxIntTag)
.with(
new GetUserStringFunction(),
@@ -219,7 +217,7 @@ public class CombineFnsTest {
.apply(Combine.perKey(CombineFns.composeKeyed()
.with(
new GetIntegerFunction(),
- new MaxIntegerFn().<String>asKeyedFn(),
+ Max.ofIntegers().<String>asKeyedFn(),
maxIntTag)
.with(
new GetUserStringFunction(),
@@ -262,7 +260,7 @@ public class CombineFnsTest {
.apply(Combine.perKey(CombineFns.composeKeyed()
.with(
new GetIntegerFunction(),
- new MaxIntegerFn().<String>asKeyedFn(),
+ Max.ofIntegers().<String>asKeyedFn(),
maxIntTag)
.with(
new GetUserStringFunction(),