You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/01/03 22:36:37 UTC

[2/7] beam git commit: [BEAM-1223] Reduced [Sum|Min|Max|Mean]*Fn viability so as to make them an internal implementation detail and prevent external code from employing them for tasks such as pipeline translation.

[BEAM-1223] Reduced [Sum|Min|Max|Mean]*Fn viability so as to make them an internal implementation detail and prevent external code from employing them for tasks such as pipeline translation.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/78a360ea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/78a360ea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/78a360ea

Branch: refs/heads/master
Commit: 78a360eac35507d9a558fc6117bb56b67b8a884e
Parents: e794f14
Author: Stas Levin <st...@gmail.com>
Authored: Sun Jan 1 13:58:32 2017 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 14:25:26 2017 -0800

----------------------------------------------------------------------
 .../beam/examples/DebuggingWordCount.java       |   4 +-
 .../org/apache/beam/examples/WordCount.java     |   2 +-
 .../cookbook/CombinePerKeyExamples.java         |   2 +-
 .../beam/examples/complete/game/GameStats.java  |   2 +-
 .../beam/examples/complete/game/UserScore.java  |   2 +-
 .../runners/apex/examples/WordCountTest.java    |   2 +-
 .../utils/ApexStateInternalsTest.java           |   2 +-
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   4 +-
 .../runners/core/GroupAlsoByWindowsDoFn.java    |   4 +-
 .../apache/beam/runners/core/NonEmptyPanes.java |   2 +-
 .../AfterDelayFromFirstElementStateMachine.java |   2 +-
 .../core/triggers/AfterPaneStateMachine.java    |   2 +-
 .../core/LateDataDroppingDoFnRunnerTest.java    |   2 +-
 .../beam/runners/core/ReduceFnRunnerTest.java   |  12 +-
 .../beam/runners/core/ReduceFnTester.java       |   2 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   4 +-
 .../runners/direct/AggregatorContainerTest.java |  16 +--
 .../CopyOnAccessInMemoryStateInternalsTest.java |   4 +-
 .../runners/direct/EvaluationContextTest.java   |   6 +-
 .../beam/runners/flink/examples/WordCount.java  |   2 +-
 .../flink/examples/streaming/AutoComplete.java  |   2 +-
 .../KafkaWindowedWordCountExample.java          |   2 +-
 .../examples/streaming/WindowedWordCount.java   |   2 +-
 .../streaming/FlinkStateInternalsTest.java      |   2 +-
 .../dataflow/DataflowPipelineJobTest.java       |   8 +-
 .../spark/aggregators/NamedAggregators.java     |   4 +-
 .../beam/runners/spark/examples/WordCount.java  |   2 +-
 .../translation/SparkGroupAlsoByWindowFn.java   |   2 +-
 .../spark/translation/SparkRuntimeContext.java  |  63 ++++-------
 .../ResumeFromCheckpointStreamingTest.java      |   2 +-
 .../streaming/utils/PAssertStreaming.java       |   4 +-
 .../apache/beam/sdk/io/PubsubUnboundedSink.java |   8 +-
 .../beam/sdk/io/PubsubUnboundedSource.java      |   5 +-
 .../org/apache/beam/sdk/testing/PAssert.java    |  12 +-
 .../org/apache/beam/sdk/transforms/Max.java     | 112 +++++++++++--------
 .../org/apache/beam/sdk/transforms/Mean.java    |  13 ++-
 .../org/apache/beam/sdk/transforms/Min.java     | 110 ++++++++++--------
 .../org/apache/beam/sdk/transforms/Sum.java     |  57 ++++++----
 .../windowing/AfterDelayFromFirstElement.java   |   2 +-
 .../sdk/transforms/windowing/AfterPane.java     |   2 +-
 .../sdk/AggregatorPipelineExtractorTest.java    |  16 +--
 .../beam/sdk/transforms/CombineFnsTest.java     |  20 ++--
 .../apache/beam/sdk/transforms/DoFnTest.java    |  15 ++-
 .../beam/sdk/transforms/DoFnTesterTest.java     |   6 +-
 .../org/apache/beam/sdk/transforms/MaxTest.java |   6 +-
 .../apache/beam/sdk/transforms/MeanTest.java    |   2 +-
 .../org/apache/beam/sdk/transforms/MinTest.java |   6 +-
 .../beam/sdk/transforms/OldDoFnContextTest.java |   2 +-
 .../apache/beam/sdk/transforms/OldDoFnTest.java |  11 +-
 .../beam/sdk/transforms/SimpleStatsFnsTest.java |  36 +++---
 .../org/apache/beam/sdk/transforms/SumTest.java |  12 +-
 .../apache/beam/sdk/transforms/ViewTest.java    |   2 +-
 .../apache/beam/sdk/util/CombineFnUtilTest.java |   8 +-
 .../util/state/InMemoryStateInternalsTest.java  |   2 +-
 .../beam/sdk/util/state/StateTagTest.java       |  11 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |   2 +-
 56 files changed, 339 insertions(+), 310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index f7c537c..997d590 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -95,9 +95,9 @@ public class DebuggingWordCount {
      * in a dashboard, etc.
      */
     private final Aggregator<Long, Long> matchedWords =
-        createAggregator("matchedWords", new Sum.SumLongFn());
+        createAggregator("matchedWords", Sum.ofLongs());
     private final Aggregator<Long, Long> unmatchedWords =
-        createAggregator("umatchedWords", new Sum.SumLongFn());
+        createAggregator("umatchedWords", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index 224d7db..7e21d47 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -87,7 +87,7 @@ public class WordCount {
    */
   static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
+        createAggregator("emptyLines", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 29655ea..37f9d79 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -80,7 +80,7 @@ public class CombinePerKeyExamples {
    */
   static class ExtractLargeWordsFn extends DoFn<TableRow, KV<String, String>> {
     private final Aggregator<Long, Long> smallerWords =
-        createAggregator("smallerWords", new Sum.SumLongFn());
+        createAggregator("smallerWords", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c){

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 6ad6a23..74f1b30 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -127,7 +127,7 @@ public class GameStats extends LeaderBoard {
               .withSideInputs(globalMeanScore)
               .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
                 private final Aggregator<Long, Long> numSpammerUsers =
-                  createAggregator("SpammerUsers", new Sum.SumLongFn());
+                  createAggregator("SpammerUsers", Sum.ofLongs());
                 @ProcessElement
                 public void processElement(ProcessContext c) {
                   Integer score = c.element().getValue();

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index cb81a7e..7dd5a8e 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -126,7 +126,7 @@ public class UserScore {
     // Log and count parse errors.
     private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class);
     private final Aggregator<Long, Long> numParseErrors =
-        createAggregator("ParseErrors", new Sum.SumLongFn());
+        createAggregator("ParseErrors", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index 28bb8ad..a1713ac 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -68,7 +68,7 @@ public class WordCountTest {
   static class ExtractWordsFn extends DoFn<String, String> {
     private static final long serialVersionUID = 1L;
     private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
+        createAggregator("emptyLines", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 1801358..4d797f1 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -62,7 +62,7 @@ public class ApexStateInternalsTest {
       StateTags.value("stringValue", StringUtf8Coder.of());
   private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
-          "sumInteger", VarIntCoder.of(), new Sum.SumIntegerFn());
+          "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
   private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
   private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 14171b3..d79683a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -50,9 +50,9 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
 
   protected final Aggregator<Long, Long> droppedDueToClosedWindow =
       createAggregator(
-          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
+          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
   protected final Aggregator<Long, Long> droppedDueToLateness =
-      createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
+      createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
 
   private final WindowingStrategy<Object, W> windowingStrategy;
   private final StateInternalsFactory<K> stateInternalsFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
index 1b32d84..9a2f8fd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
@@ -41,7 +41,7 @@ public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends Bound
   public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
 
   protected final Aggregator<Long, Long> droppedDueToClosedWindow =
-      createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
+      createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
   protected final Aggregator<Long, Long> droppedDueToLateness =
-      createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
+      createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
index 3e51dfb..0a6fd93 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
@@ -121,7 +121,7 @@ public abstract class NonEmptyPanes<K, W extends BoundedWindow> {
     private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
         PANE_ADDITIONS_TAG =
         StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
-            "count", VarLongCoder.of(), new Sum.SumLongFn()));
+            "count", VarLongCoder.of(), Sum.ofLongs()));
 
     @Override
     public void recordContent(StateAccessor<K> state) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index c8922df..b60c690 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -56,7 +56,7 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
   protected static final StateTag<Object, AccumulatorCombiningState<Instant,
                                               Combine.Holder<Instant>, Instant>> DELAYED_UNTIL_TAG =
       StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
-          "delayed", InstantCoder.of(), Min.MinFn.<Instant>naturalOrder()));
+          "delayed", InstantCoder.of(), Min.<Instant>naturalOrder()));
 
   private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
index 38b95f9..d8ad370 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
@@ -38,7 +38,7 @@ public class AfterPaneStateMachine extends OnceTriggerStateMachine {
 private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
       ELEMENTS_IN_PANE_TAG =
       StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
-          "count", VarLongCoder.of(), new Sum.SumLongFn()));
+          "count", VarLongCoder.of(), Sum.ofLongs()));
 
   private final int countElems;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
index 3cd5d4a..efe2044 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
@@ -114,7 +114,7 @@ public class LateDataDroppingDoFnRunnerTest {
 
     @Override
     public CombineFn<Long, ?, Long> getCombineFn() {
-      return new Sum.SumLongFn();
+      return Sum.ofLongs();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 4abfc9a..1bd717f 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -217,7 +217,7 @@ public class ReduceFnRunnerTest {
         ReduceFnTester.combining(
             strategy,
             mockTriggerStateMachine,
-            new Sum.SumIntegerFn().<String>asKeyedFn(),
+            Sum.ofIntegers().<String>asKeyedFn(),
             VarIntCoder.of());
 
     injectElement(tester, 2);
@@ -290,7 +290,7 @@ public class ReduceFnRunnerTest {
 
     ReduceFnTester<Integer, Integer, IntervalWindow> tester =
         ReduceFnTester
-            .combining(strategy, new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
+            .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
 
     tester.injectElements(TimestampedValue.of(13, elementTimestamp));
 
@@ -322,7 +322,7 @@ public class ReduceFnRunnerTest {
         ReduceFnTester.combining(
             strategy,
             mockTriggerStateMachine,
-            new Sum.SumIntegerFn().<String>asKeyedFn(),
+            Sum.ofIntegers().<String>asKeyedFn(),
             VarIntCoder.of());
 
     injectElement(tester, 1);
@@ -1069,7 +1069,7 @@ public class ReduceFnRunnerTest {
             SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30)))
         .withTrigger(AfterWatermark.pastEndOfWindow())
         .withAllowedLateness(Duration.millis(1000)),
-        new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
+        Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
 
     tester.injectElements(
         // assigned to [-60, 40), [-30, 70), [0, 100)
@@ -1212,7 +1212,7 @@ public class ReduceFnRunnerTest {
 
     ReduceFnTester<Integer, Integer, IntervalWindow> tester =
         ReduceFnTester
-            .combining(strategy, new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
+            .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
 
     tester.advanceInputWatermark(new Instant(0));
     tester.advanceProcessingTime(new Instant(0));
@@ -1268,7 +1268,7 @@ public class ReduceFnRunnerTest {
 
     ReduceFnTester<Integer, Integer, IntervalWindow> tester =
         ReduceFnTester
-            .combining(strategy, new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
+            .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
 
     tester.advanceInputWatermark(new Instant(0));
     tester.advanceProcessingTime(new Instant(0));

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 890195a..226f5f0 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -638,7 +638,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
     @Override
     public CombineFn<Long, ?, Long> getCombineFn() {
-      return new Sum.SumLongFn();
+      return Sum.ofLongs();
     }
 
     public long getSum() {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index bb11923..2bc0d8d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -147,10 +147,10 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
       reduceFn = SystemReduceFn.buffering(valueCoder);
       droppedDueToClosedWindow = aggregatorChanges.createSystemAggregator(stepContext,
           GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
-          new Sum.SumLongFn());
+          Sum.ofLongs());
       droppedDueToLateness = aggregatorChanges.createSystemAggregator(stepContext,
           GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER,
-          new Sum.SumLongFn());
+          Sum.ofLongs());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
index f770800..37524eb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
@@ -26,7 +26,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.core.ExecutionContext.StepContext;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum.SumIntegerFn;
+import org.apache.beam.sdk.transforms.Sum;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -61,13 +61,13 @@ public class AggregatorContainerTest {
   @Test
   public void addsAggregatorsOnCommit() {
     AggregatorContainer.Mutator mutator = container.createMutator();
-    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn()).addValue(5);
+    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
     mutator.commit();
 
     assertThat((Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(5));
 
     mutator = container.createMutator();
-    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn()).addValue(8);
+    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(8);
 
     assertThat("Shouldn't update value until commit",
         (Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(5));
@@ -81,14 +81,14 @@ public class AggregatorContainerTest {
     mutator.commit();
 
     thrown.expect(IllegalStateException.class);
-    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn()).addValue(5);
+    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
   }
 
   @Test
   public void failToAddValueAfterCommit() {
     AggregatorContainer.Mutator mutator = container.createMutator();
     Aggregator<Integer, ?> aggregator =
-        mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn());
+        mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers());
     mutator.commit();
 
     thrown.expect(IllegalStateException.class);
@@ -99,12 +99,12 @@ public class AggregatorContainerTest {
   public void failToAddValueAfterCommitWithPrevious() {
     AggregatorContainer.Mutator mutator = container.createMutator();
     mutator.createAggregatorForDoFn(
-        fn, stepContext, "sum_int", new SumIntegerFn()).addValue(5);
+        fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
     mutator.commit();
 
     mutator = container.createMutator();
     Aggregator<Integer, ?> aggregator = mutator.createAggregatorForDoFn(
-        fn, stepContext, "sum_int", new SumIntegerFn());
+        fn, stepContext, "sum_int", Sum.ofIntegers());
     mutator.commit();
 
     thrown.expect(IllegalStateException.class);
@@ -123,7 +123,7 @@ public class AggregatorContainerTest {
         @Override
         public void run() {
           mutator.createAggregatorForDoFn(
-              fn, stepContext, "sum_int", new SumIntegerFn()).addValue(value);
+              fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(value);
           mutator.commit();
         }
       });

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 12ef66c..4284f3e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -167,7 +167,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
   public void testAccumulatorCombiningStateWithUnderlying() throws CannotProvideCoderException {
     CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CombineFn<Long, long[], Long> sumLongFn = new Sum.SumLongFn();
+    CombineFn<Long, long[], Long> sumLongFn = Sum.ofLongs();
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
     CoderRegistry reg = pipeline.getCoderRegistry();
@@ -197,7 +197,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
   public void testKeyedAccumulatorCombiningStateWithUnderlying() throws Exception {
     CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    KeyedCombineFn<String, Long, long[], Long> sumLongFn = new Sum.SumLongFn().asKeyedFn();
+    KeyedCombineFn<String, Long, long[], Long> sumLongFn = Sum.ofLongs().asKeyedFn();
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
     CoderRegistry reg = pipeline.getCoderRegistry();

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 15340da..ad6e32d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -45,7 +45,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Sum.SumLongFn;
+import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -250,7 +250,7 @@ public class EvaluationContextTest {
         "STEP", createdProducer.getTransform().getName());
     AggregatorContainer container = context.getAggregatorContainer();
     AggregatorContainer.Mutator mutator = container.createMutator();
-    mutator.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(4L);
+    mutator.createAggregatorForDoFn(fn, stepContext, "foo", Sum.ofLongs()).addValue(4L);
 
     TransformResult<?> result =
         StepTransformResult.withoutHold(createdProducer)
@@ -260,7 +260,7 @@ public class EvaluationContextTest {
     assertThat((Long) context.getAggregatorContainer().getAggregate("STEP", "foo"), equalTo(4L));
 
     AggregatorContainer.Mutator mutatorAgain = container.createMutator();
-    mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(12L);
+    mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", Sum.ofLongs()).addValue(12L);
 
     TransformResult<?> secondResult =
         StepTransformResult.withoutHold(downstreamProducer)

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index b6b3c1a..6ae4cf8 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -46,7 +46,7 @@ public class WordCount {
    */
   public static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
+        createAggregator("emptyLines", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index 3405981..f33e616 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -314,7 +314,7 @@ public class AutoComplete {
 
   static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
-            createAggregator("emptyLines", new Sum.SumLongFn());
+            createAggregator("emptyLines", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index 42c42f3..ee0e874 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -55,7 +55,7 @@ public class KafkaWindowedWordCountExample {
    */
   public static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
+        createAggregator("emptyLines", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
index 2246bdd..792c214 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
@@ -66,7 +66,7 @@ public class WindowedWordCount {
 
   static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
+        createAggregator("emptyLines", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 628212a..126f611 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -70,7 +70,7 @@ public class FlinkStateInternalsTest {
       StateTags.value("stringValue", StringUtf8Coder.of());
   private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
-          "sumInteger", VarIntCoder.of(), new Sum.SumIntegerFn());
+          "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
   private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
   private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 1890da1..6999e03 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -420,7 +420,7 @@ public class DataflowPipelineJobTest {
   @Test
   public void testGetAggregatorValuesWithSingleMetricUpdateReturnsSingletonCollection()
       throws IOException, AggregatorRetrievalException {
-    CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+    CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
     String aggregatorName = "agg";
     Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
     @SuppressWarnings("unchecked")
@@ -468,7 +468,7 @@ public class DataflowPipelineJobTest {
   @Test
   public void testGetAggregatorValuesWithMultipleMetricUpdatesReturnsCollection()
       throws IOException, AggregatorRetrievalException {
-    CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+    CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
     String aggregatorName = "agg";
     Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
 
@@ -536,7 +536,7 @@ public class DataflowPipelineJobTest {
   @Test
   public void testGetAggregatorValuesWithUnrelatedMetricUpdateIgnoresUpdate()
       throws IOException, AggregatorRetrievalException {
-    CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+    CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
     String aggregatorName = "agg";
     Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
     @SuppressWarnings("unchecked")
@@ -600,7 +600,7 @@ public class DataflowPipelineJobTest {
   @Test
   public void testGetAggregatorValuesWhenClientThrowsExceptionThrowsAggregatorRetrievalException()
       throws IOException, AggregatorRetrievalException {
-    CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+    CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
     String aggregatorName = "agg";
     Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index 4e96466..52fe994 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -153,7 +153,9 @@ public class NamedAggregators implements Serializable {
   }
 
   /**
-   * =&gt; combineFunction in data flow.
+   * @param <InputT>    Input data type
+   * @param <InterT> Intermediate data type (useful for averages)
+   * @param <OutputT>   Output data type
    */
   public static class CombineFunctionState<InputT, InterT, OutputT>
       implements State<InputT, InterT, OutputT> {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index 1252d12..da14ee2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -46,7 +46,7 @@ public class WordCount {
    */
   static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
+        createAggregator("emptyLines", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
index 5432d58..b615132 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
@@ -78,7 +78,7 @@ public class SparkGroupAlsoByWindowFn<K, InputT, W extends BoundedWindow>
     droppedDueToClosedWindow = runtimeContext.createAggregator(
         accumulator,
         GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
-        new Sum.SumLongFn());
+        Sum.ofLongs());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 01b6b54..9c3d79f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -32,10 +32,6 @@ import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.spark.Accumulator;
 
 /**
@@ -92,18 +88,26 @@ public class SparkRuntimeContext implements Serializable {
       Combine.CombineFn<? super InputT, InterT, OutputT> combineFn) {
     @SuppressWarnings("unchecked")
     Aggregator<InputT, OutputT> aggregator = (Aggregator<InputT, OutputT>) aggregators.get(named);
-    if (aggregator == null) {
-      @SuppressWarnings("unchecked")
-      NamedAggregators.CombineFunctionState<InputT, InterT, OutputT> state =
-          new NamedAggregators.CombineFunctionState<>(
-              (Combine.CombineFn<InputT, InterT, OutputT>) combineFn,
-              (Coder<InputT>) getCoder(combineFn),
-              this);
-      accum.add(new NamedAggregators(named, state));
-      aggregator = new SparkAggregator<>(named, state);
-      aggregators.put(named, aggregator);
+    try {
+      if (aggregator == null) {
+        @SuppressWarnings("unchecked")
+        final
+        NamedAggregators.CombineFunctionState<InputT, InterT, OutputT> state =
+            new NamedAggregators.CombineFunctionState<>(
+                (Combine.CombineFn<InputT, InterT, OutputT>) combineFn,
+                // hidden assumption: InputT == OutputT
+                (Coder<InputT>) getCoderRegistry().getCoder(combineFn.getOutputType()),
+                this);
+
+        accum.add(new NamedAggregators(named, state));
+        aggregator = new SparkAggregator<>(named, state);
+        aggregators.put(named, aggregator);
+      }
+      return aggregator;
+    } catch (CannotProvideCoderException e) {
+      throw new RuntimeException(String.format("Unable to create an aggregator named: [%s]", named),
+                                 e);
     }
-    return aggregator;
   }
 
   public CoderRegistry getCoderRegistry() {
@@ -114,35 +118,6 @@ public class SparkRuntimeContext implements Serializable {
     return coderRegistry;
   }
 
-  private Coder<?> getCoder(Combine.CombineFn<?, ?, ?> combiner) {
-    try {
-      if (combiner.getClass() == Sum.SumIntegerFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
-      } else if (combiner.getClass() == Sum.SumLongFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
-      } else if (combiner.getClass() == Sum.SumDoubleFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
-      } else if (combiner.getClass() == Min.MinIntegerFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
-      } else if (combiner.getClass() == Min.MinLongFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
-      } else if (combiner.getClass() == Min.MinDoubleFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
-      } else if (combiner.getClass() == Max.MaxIntegerFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
-      } else if (combiner.getClass() == Max.MaxLongFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
-      } else if (combiner.getClass() == Max.MaxDoubleFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
-      } else {
-        throw new IllegalArgumentException("unsupported combiner in Aggregator: "
-            + combiner.getClass().getName());
-      }
-    } catch (CannotProvideCoderException e) {
-      throw new IllegalStateException("Could not determine default coder for combiner", e);
-    }
-  }
-
   /**
    * Initialize spark aggregators exactly once.
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 2718b5f..ab04c5c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -182,7 +182,7 @@ public class ResumeFromCheckpointStreamingTest {
   private static class FormatAsText extends DoFn<KV<String, String>, String> {
 
     private final Aggregator<Long, Long> aggregator =
-        createAggregator("processedMessages", new Sum.SumLongFn());
+        createAggregator("processedMessages", Sum.ofLongs());
 
     @ProcessElement
     public void process(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
index 0284b3d..cd9de92 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
@@ -97,9 +97,9 @@ public final class PAssertStreaming implements Serializable {
 
   private static class AssertDoFn<T> extends DoFn<Iterable<T>, Void> {
     private final Aggregator<Integer, Integer> success =
-        createAggregator(PAssert.SUCCESS_COUNTER, new Sum.SumIntegerFn());
+        createAggregator(PAssert.SUCCESS_COUNTER, Sum.ofIntegers());
     private final Aggregator<Integer, Integer> failure =
-        createAggregator(PAssert.FAILURE_COUNTER, new Sum.SumIntegerFn());
+        createAggregator(PAssert.FAILURE_COUNTER, Sum.ofIntegers());
     private final T[] expected;
 
     AssertDoFn(T[] expected) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 58414c6..75f6b7d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -154,7 +154,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
    */
   private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>> {
     private final Aggregator<Long, Long> elementCounter =
-        createAggregator("elements", new Sum.SumLongFn());
+        createAggregator("elements", Sum.ofLongs());
     private final Coder<T> elementCoder;
     private final int numShards;
     private final RecordIdMethod recordIdMethod;
@@ -219,11 +219,11 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
     private transient PubsubClient pubsubClient;
 
     private final Aggregator<Long, Long> batchCounter =
-        createAggregator("batches", new Sum.SumLongFn());
+        createAggregator("batches", Sum.ofLongs());
     private final Aggregator<Long, Long> elementCounter =
-        createAggregator("elements", new Sum.SumLongFn());
+        createAggregator("elements", Sum.ofLongs());
     private final Aggregator<Long, Long> byteCounter =
-        createAggregator("bytes", new Sum.SumLongFn());
+        createAggregator("bytes", Sum.ofLongs());
 
     WriterFn(
         PubsubClientFactory pubsubFactory, ValueProvider<TopicPath> topic,

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index da3b437..4b3792d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -59,7 +59,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.Sum.SumLongFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -216,7 +215,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
         }
       };
 
-  private static final Combine.BinaryCombineLongFn SUM = new SumLongFn();
+  private static final Combine.BinaryCombineLongFn SUM = Sum.ofLongs();
 
   // ================================================================================
   // Checkpoint
@@ -1159,7 +1158,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
 
   private static class StatsFn<T> extends DoFn<T, T> {
     private final Aggregator<Long, Long> elementCounter =
-        createAggregator("elements", new Sum.SumLongFn());
+        createAggregator("elements", Sum.ofLongs());
 
     private final PubsubClientFactory pubsubFactory;
     private final ValueProvider<SubscriptionPath> subscription;

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index b23f4f3..b57f4a9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -1019,9 +1019,9 @@ public class PAssert {
   private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> {
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
-        createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+        createAggregator(SUCCESS_COUNTER, Sum.ofIntegers());
     private final Aggregator<Integer, Integer> failure =
-        createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+        createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
     private final PCollectionView<ActualT> actual;
 
     private SideInputCheckerDoFn(
@@ -1054,9 +1054,9 @@ public class PAssert {
   private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> {
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
-        createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+        createAggregator(SUCCESS_COUNTER, Sum.ofIntegers());
     private final Aggregator<Integer, Integer> failure =
-        createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+        createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
 
     private GroupedValuesCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
       this.checkerFn = checkerFn;
@@ -1079,9 +1079,9 @@ public class PAssert {
   private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> {
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
-        createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+        createAggregator(SUCCESS_COUNTER, Sum.ofIntegers());
     private final Aggregator<Integer, Integer> failure =
-        createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+        createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
 
     private SingletonCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
       this.checkerFn = checkerFn;

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index 0990ca4..696bed9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -110,13 +110,69 @@ public class Max {
   }
 
   /**
+   * A {@code CombineFn} that computes the maximum of a collection of {@code Integer}s, useful as an
+   * argument to {@link Combine#globally} or {@link Combine#perKey}.
+   */
+  public static Combine.BinaryCombineIntegerFn ofIntegers() {
+    return new Max.MaxIntegerFn();
+  }
+
+  /**
+   * A {@code CombineFn} that computes the maximum of a collection of {@code Long}s, useful as an
+   * argument to {@link Combine#globally} or {@link Combine#perKey}.
+   */
+  public static Combine.BinaryCombineLongFn ofLongs() {
+    return new Max.MaxLongFn();
+  }
+
+  /**
+   * A {@code CombineFn} that computes the maximum of a collection of {@code Double}s, useful as an
+   * argument to {@link Combine#globally} or {@link Combine#perKey}.
+   */
+  public static Combine.BinaryCombineDoubleFn ofDoubles() {
+    return new Max.MaxDoubleFn();
+  }
+
+  /**
+   * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+   * using an arbitrary {@link Comparator} and {@code identity},
+   * useful as an argument to {@link Combine#globally} or {@link Combine#perKey}.
+   *
+   * @param <T> the type of the values being compared
+   */
+  public static <T, ComparatorT extends Comparator<? super T> & Serializable>
+  BinaryCombineFn<T> of(final T identity, final ComparatorT comparator) {
+    return new MaxFn<T>(identity, comparator);
+  }
+
+  /**
+   * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+   * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
+   * {@link Combine#perKey}.
+   *
+   * @param <T> the type of the values being compared
+   */
+  public static <T, ComparatorT extends Comparator<? super T> & Serializable>
+  BinaryCombineFn<T> of(final ComparatorT comparator) {
+    return new MaxFn<T>(null, comparator);
+  }
+
+  public static <T extends Comparable<? super T>> BinaryCombineFn<T> naturalOrder(T identity) {
+    return new MaxFn<T>(identity, new Top.Largest<T>());
+  }
+
+  public static <T extends Comparable<? super T>> BinaryCombineFn<T> naturalOrder() {
+    return new MaxFn<T>(null, new Top.Largest<T>());
+  }
+
+  /**
    * Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
    * PCollection<T>} whose contents is the maximum according to the natural ordering of {@code T}
    * of the input {@code PCollection}'s elements, or {@code null} if there are no elements.
    */
   public static <T extends Comparable<? super T>>
   Combine.Globally<T, T> globally() {
-    return Combine.<T, T>globally(MaxFn.<T>naturalOrder());
+    return Combine.<T, T>globally(Max.<T>naturalOrder());
   }
 
   /**
@@ -129,7 +185,7 @@ public class Max {
    */
   public static <K, T extends Comparable<? super T>>
   Combine.PerKey<K, T, T> perKey() {
-    return Combine.<K, T, T>perKey(MaxFn.<T>naturalOrder());
+    return Combine.<K, T, T>perKey(Max.<T>naturalOrder());
   }
 
   /**
@@ -139,7 +195,7 @@ public class Max {
    */
   public static <T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.Globally<T, T> globally(ComparatorT comparator) {
-    return Combine.<T, T>globally(MaxFn.of(comparator));
+    return Combine.<T, T>globally(Max.of(comparator));
   }
 
   /**
@@ -151,19 +207,12 @@ public class Max {
    */
   public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
-    return Combine.<K, T, T>perKey(MaxFn.of(comparator));
+    return Combine.<K, T, T>perKey(Max.of(comparator));
   }
 
   /////////////////////////////////////////////////////////////////////////////
 
-  /**
-   * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
-   * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
-   * {@link Combine#perKey}.
-   *
-   * @param <T> the type of the values being compared
-   */
-  public static class MaxFn<T> extends BinaryCombineFn<T> {
+  private static class MaxFn<T> extends BinaryCombineFn<T> {
 
     private final T identity;
     private final Comparator<? super T> comparator;
@@ -174,24 +223,6 @@ public class Max {
       this.comparator = comparator;
     }
 
-    public static <T, ComparatorT extends Comparator<? super T> & Serializable>
-    MaxFn<T> of(T identity, ComparatorT comparator) {
-      return new MaxFn<T>(identity, comparator);
-    }
-
-    public static <T, ComparatorT extends Comparator<? super T> & Serializable>
-    MaxFn<T> of(ComparatorT comparator) {
-      return new MaxFn<T>(null, comparator);
-    }
-
-    public static <T extends Comparable<? super T>> MaxFn<T> naturalOrder(T identity) {
-      return new MaxFn<T>(identity, new Top.Largest<T>());
-    }
-
-    public static <T extends Comparable<? super T>> MaxFn<T> naturalOrder() {
-      return new MaxFn<T>(null, new Top.Largest<T>());
-    }
-
     @Override
     public T identity() {
       return identity;
@@ -210,11 +241,8 @@ public class Max {
     }
   }
 
-  /**
-   * A {@code CombineFn} that computes the maximum of a collection of {@code Integer}s, useful as an
-   * argument to {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn {
+  private static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn {
+
     @Override
     public int apply(int left, int right) {
       return left >= right ? left : right;
@@ -226,11 +254,8 @@ public class Max {
     }
   }
 
-  /**
-   * A {@code CombineFn} that computes the maximum of a collection of {@code Long}s, useful as an
-   * argument to {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class MaxLongFn extends Combine.BinaryCombineLongFn {
+  private static class MaxLongFn extends Combine.BinaryCombineLongFn {
+
     @Override
     public long apply(long left, long right) {
       return left >= right ? left : right;
@@ -242,11 +267,8 @@ public class Max {
     }
   }
 
-  /**
-   * A {@code CombineFn} that computes the maximum of a collection of {@code Double}s, useful as an
-   * argument to {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn {
+  private static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn {
+
     @Override
     public double apply(double left, double right) {
       return left >= right ? left : right;

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index cb77ba3..7e62e9d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -84,9 +84,6 @@ public class Mean {
     return Combine.<K, NumT, Double>perKey(new MeanFn<>());
   }
 
-
-  /////////////////////////////////////////////////////////////////////////////
-
   /**
    * A {@code Combine.CombineFn} that computes the arithmetic mean
    * (a.k.a. average) of an {@code Iterable} of numbers of type
@@ -97,13 +94,19 @@ public class Mean {
    *
    * @param <NumT> the type of the {@code Number}s being combined
    */
-  static class MeanFn<NumT extends Number>
+  public static <NumT extends Number>
+  Combine.AccumulatingCombineFn<NumT, CountSum<NumT>, Double> of() {
+    return new MeanFn<>();
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  private static class MeanFn<NumT extends Number>
   extends Combine.AccumulatingCombineFn<NumT, CountSum<NumT>, Double> {
     /**
      * Constructs a combining function that computes the mean over
      * a collection of values of type {@code N}.
      */
-    public MeanFn() {}
 
     @Override
     public CountSum<NumT> createAccumulator() {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index 5003594..b208929 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -110,13 +110,69 @@ public class Min {
   }
 
   /**
+   * A {@code CombineFn} that computes the minimum of a collection of {@code Integer}s, useful as an
+   * argument to {@link Combine#globally} or {@link Combine#perKey}.
+   */
+  public static Combine.BinaryCombineIntegerFn ofIntegers() {
+    return new Min.MinIntegerFn();
+  }
+
+  /**
+   * A {@code CombineFn} that computes the minimum of a collection of {@code Long}s, useful as an
+   * argument to {@link Combine#globally} or {@link Combine#perKey}.
+   */
+  public static Combine.BinaryCombineLongFn ofLongs() {
+    return new Min.MinLongFn();
+  }
+
+  /**
+   * A {@code CombineFn} that computes the minimum of a collection of {@code Double}s, useful as an
+   * argument to {@link Combine#globally} or {@link Combine#perKey}.
+   */
+  public static Combine.BinaryCombineDoubleFn ofDoubles() {
+    return new Min.MinDoubleFn();
+  }
+
+  /**
+   * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+   * using an arbitrary {@link Comparator} and an {@code identity},
+   * useful as an argument to {@link Combine#globally} or {@link Combine#perKey}.
+   *
+   * @param <T> the type of the values being compared
+   */
+  public static <T, ComparatorT extends Comparator<? super T> & Serializable>
+  BinaryCombineFn<T> of(T identity, ComparatorT comparator) {
+    return new MinFn<T>(identity, comparator);
+  }
+
+  /**
+   * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+   * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
+   * {@link Combine#perKey}.
+   *
+   * @param <T> the type of the values being compared
+   */
+  public static <T, ComparatorT extends Comparator<? super T> & Serializable>
+  BinaryCombineFn<T> of(ComparatorT comparator) {
+    return new MinFn<T>(null, comparator);
+  }
+
+  public static <T extends Comparable<? super T>> BinaryCombineFn<T> naturalOrder(T identity) {
+    return new MinFn<T>(identity, new Top.Largest<T>());
+  }
+
+  public static <T extends Comparable<? super T>> BinaryCombineFn<T> naturalOrder() {
+    return new MinFn<T>(null, new Top.Largest<T>());
+  }
+
+  /**
    * Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
    * PCollection<T>} whose contents is the minimum according to the natural ordering of {@code T}
    * of the input {@code PCollection}'s elements, or {@code null} if there are no elements.
    */
   public static <T extends Comparable<? super T>>
   Combine.Globally<T, T> globally() {
-    return Combine.<T, T>globally(MinFn.<T>naturalOrder());
+    return Combine.<T, T>globally(Min.<T>naturalOrder());
   }
 
   /**
@@ -129,7 +185,7 @@ public class Min {
    */
   public static <K, T extends Comparable<? super T>>
   Combine.PerKey<K, T, T> perKey() {
-    return Combine.<K, T, T>perKey(MinFn.<T>naturalOrder());
+    return Combine.<K, T, T>perKey(Min.<T>naturalOrder());
   }
 
   /**
@@ -139,7 +195,7 @@ public class Min {
    */
   public static <T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.Globally<T, T> globally(ComparatorT comparator) {
-    return Combine.<T, T>globally(MinFn.of(comparator));
+    return Combine.<T, T>globally(Min.of(comparator));
   }
 
   /**
@@ -151,19 +207,12 @@ public class Min {
    */
   public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
-    return Combine.<K, T, T>perKey(MinFn.of(comparator));
+    return Combine.<K, T, T>perKey(Min.of(comparator));
   }
 
   /////////////////////////////////////////////////////////////////////////////
 
-  /**
-   * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
-   * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
-   * {@link Combine#perKey}.
-   *
-   * @param <T> the type of the values being compared
-   */
-  public static class MinFn<T> extends BinaryCombineFn<T> {
+  private static class MinFn<T> extends BinaryCombineFn<T> {
 
     private final T identity;
     private final Comparator<? super T> comparator;
@@ -174,24 +223,6 @@ public class Min {
       this.comparator = comparator;
     }
 
-    public static <T, ComparatorT extends Comparator<? super T> & Serializable>
-    MinFn<T> of(T identity, ComparatorT comparator) {
-      return new MinFn<T>(identity, comparator);
-    }
-
-    public static <T, ComparatorT extends Comparator<? super T> & Serializable>
-    MinFn<T> of(ComparatorT comparator) {
-      return new MinFn<T>(null, comparator);
-    }
-
-    public static <T extends Comparable<? super T>> MinFn<T> naturalOrder(T identity) {
-      return new MinFn<T>(identity, new Top.Largest<T>());
-    }
-
-    public static <T extends Comparable<? super T>> MinFn<T> naturalOrder() {
-      return new MinFn<T>(null, new Top.Largest<T>());
-    }
-
     @Override
     public T identity() {
       return identity;
@@ -210,11 +241,7 @@ public class Min {
     }
   }
 
-  /**
-   * A {@code CombineFn} that computes the minimum of a collection of {@code Integer}s, useful as an
-   * argument to {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class MinIntegerFn extends Combine.BinaryCombineIntegerFn {
+  private static class MinIntegerFn extends Combine.BinaryCombineIntegerFn {
 
     @Override
     public int apply(int left, int right) {
@@ -227,11 +254,8 @@ public class Min {
     }
   }
 
-  /**
-   * A {@code CombineFn} that computes the minimum of a collection of {@code Long}s, useful as an
-   * argument to {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class MinLongFn extends Combine.BinaryCombineLongFn {
+  private static class MinLongFn extends Combine.BinaryCombineLongFn {
+
     @Override
     public long apply(long left, long right) {
       return left <= right ? left : right;
@@ -243,11 +267,7 @@ public class Min {
     }
   }
 
-  /**
-   * A {@code CombineFn} that computes the minimum of a collection of {@code Double}s, useful as an
-   * argument to {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class MinDoubleFn extends Combine.BinaryCombineDoubleFn {
+  private static class MinDoubleFn extends Combine.BinaryCombineDoubleFn {
 
     @Override
     public double apply(double left, double right) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
index 48eafc3..ccade4d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
@@ -50,7 +50,7 @@ public class Sum {
    * {@code 0} if there are no elements.
    */
   public static Combine.Globally<Integer, Integer> integersGlobally() {
-    return Combine.globally(new SumIntegerFn());
+    return Combine.globally(Sum.ofIntegers());
   }
 
   /**
@@ -62,7 +62,7 @@ public class Sum {
    * that key in the input {@code PCollection}.
    */
   public static <K> Combine.PerKey<K, Integer, Integer> integersPerKey() {
-    return Combine.<K, Integer, Integer>perKey(new SumIntegerFn());
+    return Combine.<K, Integer, Integer>perKey(Sum.ofIntegers());
   }
 
   /**
@@ -73,7 +73,7 @@ public class Sum {
    * {@code 0} if there are no elements.
    */
   public static Combine.Globally<Long, Long> longsGlobally() {
-    return Combine.globally(new SumLongFn());
+    return Combine.globally(Sum.ofLongs());
   }
 
   /**
@@ -85,7 +85,7 @@ public class Sum {
    * that key in the input {@code PCollection}.
    */
   public static <K> Combine.PerKey<K, Long, Long> longsPerKey() {
-    return Combine.<K, Long, Long>perKey(new SumLongFn());
+    return Combine.<K, Long, Long>perKey(Sum.ofLongs());
   }
 
   /**
@@ -96,7 +96,7 @@ public class Sum {
    * {@code 0} if there are no elements.
    */
   public static Combine.Globally<Double, Double> doublesGlobally() {
-    return Combine.globally(new SumDoubleFn());
+    return Combine.globally(Sum.ofDoubles());
   }
 
   /**
@@ -108,18 +108,40 @@ public class Sum {
    * that key in the input {@code PCollection}.
    */
   public static <K> Combine.PerKey<K, Double, Double> doublesPerKey() {
-    return Combine.<K, Double, Double>perKey(new SumDoubleFn());
+    return Combine.<K, Double, Double>perKey(Sum.ofDoubles());
   }
 
+  /**
+   * A {@code SerializableFunction} that computes the sum of an
+   * {@code Iterable} of {@code Integer}s, useful as an argument to
+   * {@link Combine#globally} or {@link Combine#perKey}.
+   */
+  public static Combine.BinaryCombineIntegerFn ofIntegers() {
+    return new SumIntegerFn();
+  }
 
-  /////////////////////////////////////////////////////////////////////////////
+  /**
+   * A {@code SerializableFunction} that computes the sum of an
+   * {@code Iterable} of {@code Double}s, useful as an argument to
+   * {@link Combine#globally} or {@link Combine#perKey}.
+   */
+  public static Combine.BinaryCombineDoubleFn ofDoubles() {
+    return new SumDoubleFn();
+  }
 
   /**
    * A {@code SerializableFunction} that computes the sum of an
-   * {@code Iterable} of {@code Integer}s, useful as an argument to
+   * {@code Iterable} of {@code Long}s, useful as an argument to
    * {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class SumIntegerFn extends Combine.BinaryCombineIntegerFn {
+  public static Combine.BinaryCombineLongFn ofLongs() {
+    return new SumLongFn();
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  private static class SumIntegerFn extends Combine.BinaryCombineIntegerFn {
+
     @Override
     public int apply(int a, int b) {
       return a + b;
@@ -131,13 +153,8 @@ public class Sum {
     }
   }
 
-  /**
-   * A {@code SerializableFunction} that computes the sum of an
-   * {@code Iterable} of {@code Long}s, useful as an argument to
-   * {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class SumLongFn
-      extends Combine.BinaryCombineLongFn {
+  private static class SumLongFn extends Combine.BinaryCombineLongFn {
+
     @Override
     public long apply(long a, long b) {
       return a + b;
@@ -149,12 +166,8 @@ public class Sum {
     }
   }
 
-  /**
-   * A {@code SerializableFunction} that computes the sum of an
-   * {@code Iterable} of {@code Double}s, useful as an argument to
-   * {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class SumDoubleFn extends Combine.BinaryCombineDoubleFn {
+  private static class SumDoubleFn extends Combine.BinaryCombineDoubleFn {
+
     @Override
     public double apply(double a, double b) {
       return a + b;

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
index 9daecb2..6392fb5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
@@ -51,7 +51,7 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger {
   protected static final StateTag<Object, AccumulatorCombiningState<Instant,
                                               Combine.Holder<Instant>, Instant>> DELAYED_UNTIL_TAG =
       StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
-          "delayed", InstantCoder.of(), Min.MinFn.<Instant>naturalOrder()));
+          "delayed", InstantCoder.of(), Min.<Instant>naturalOrder()));
 
   private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
index 4a706e6..d66e1c4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
@@ -37,7 +37,7 @@ public class AfterPane extends OnceTrigger {
 private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
       ELEMENTS_IN_PANE_TAG =
       StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
-          "count", VarLongCoder.of(), new Sum.SumLongFn()));
+          "count", VarLongCoder.of(), Sum.ofLongs()));
 
   private final int countElems;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
index 1bf2c3d..22efd85 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
@@ -70,8 +70,8 @@ public class AggregatorPipelineExtractorTest {
     AggregatorProvidingDoFn<ThreadGroup, StrictMath> fn = new AggregatorProvidingDoFn<>();
     when(bound.getFn()).thenReturn(fn);
 
-    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
-    Aggregator<Integer, Integer> aggregatorTwo = fn.addAggregator(new Min.MinIntegerFn());
+    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Sum.ofLongs());
+    Aggregator<Integer, Integer> aggregatorTwo = fn.addAggregator(Min.ofIntegers());
 
     TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
     when(transformNode.getTransform()).thenReturn(bound);
@@ -98,8 +98,8 @@ public class AggregatorPipelineExtractorTest {
     AggregatorProvidingDoFn<Object, Void> fn = new AggregatorProvidingDoFn<>();
     when(bound.getFn()).thenReturn(fn);
 
-    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Max.MaxLongFn());
-    Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
+    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Max.ofLongs());
+    Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(Min.ofDoubles());
 
     TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
     when(transformNode.getTransform()).thenReturn(bound);
@@ -129,8 +129,8 @@ public class AggregatorPipelineExtractorTest {
     when(bound.getFn()).thenReturn(fn);
     when(otherBound.getFn()).thenReturn(fn);
 
-    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
-    Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
+    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Sum.ofLongs());
+    Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(Min.ofDoubles());
 
     TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
     when(transformNode.getTransform()).thenReturn(bound);
@@ -160,7 +160,7 @@ public class AggregatorPipelineExtractorTest {
     ParDo.Bound bound = mock(ParDo.Bound.class, "Bound");
 
     AggregatorProvidingDoFn<ThreadGroup, Void> fn = new AggregatorProvidingDoFn<>();
-    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
+    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Sum.ofLongs());
 
     when(bound.getFn()).thenReturn(fn);
 
@@ -168,7 +168,7 @@ public class AggregatorPipelineExtractorTest {
     ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound");
 
     AggregatorProvidingDoFn<Long, Long> otherFn = new AggregatorProvidingDoFn<>();
-    Aggregator<Double, Double> aggregatorTwo = otherFn.addAggregator(new Sum.SumDoubleFn());
+    Aggregator<Double, Double> aggregatorTwo = otherFn.addAggregator(Sum.ofDoubles());
 
     when(otherBound.getFn()).thenReturn(otherFn);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index cdd4707..4d35e53 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -41,8 +41,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
 import org.apache.beam.sdk.transforms.CombineFns.CoCombineResult;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
-import org.apache.beam.sdk.transforms.Min.MinIntegerFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -71,8 +69,8 @@ public class  CombineFnsTest {
 
     TupleTag<Integer> tag = new TupleTag<Integer>();
     CombineFns.compose()
-      .with(new GetIntegerFunction(), new MaxIntegerFn(), tag)
-      .with(new GetIntegerFunction(), new MinIntegerFn(), tag);
+      .with(new GetIntegerFunction(), Max.ofIntegers(), tag)
+      .with(new GetIntegerFunction(), Min.ofIntegers(), tag);
   }
 
   @Test
@@ -82,8 +80,8 @@ public class  CombineFnsTest {
 
     TupleTag<Integer> tag = new TupleTag<Integer>();
     CombineFns.composeKeyed()
-      .with(new GetIntegerFunction(), new MaxIntegerFn(), tag)
-      .with(new GetIntegerFunction(), new MinIntegerFn(), tag);
+      .with(new GetIntegerFunction(), Max.ofIntegers(), tag)
+      .with(new GetIntegerFunction(), Min.ofIntegers(), tag);
   }
 
   @Test
@@ -145,7 +143,7 @@ public class  CombineFnsTest {
         .apply(Combine.globally(CombineFns.compose()
             .with(
                 new GetIntegerFunction(),
-                new MaxIntegerFn(),
+                Max.ofIntegers(),
                 maxIntTag)
             .with(
                 new GetUserStringFunction(),
@@ -159,7 +157,7 @@ public class  CombineFnsTest {
         .apply(Combine.perKey(CombineFns.composeKeyed()
             .with(
                 new GetIntegerFunction(),
-                new MaxIntegerFn().<String>asKeyedFn(),
+                Max.ofIntegers().<String>asKeyedFn(),
                 maxIntTag)
             .with(
                 new GetUserStringFunction(),
@@ -203,7 +201,7 @@ public class  CombineFnsTest {
         .apply(Combine.globally(CombineFns.compose()
             .with(
                 new GetIntegerFunction(),
-                new MaxIntegerFn(),
+                Max.ofIntegers(),
                 maxIntTag)
             .with(
                 new GetUserStringFunction(),
@@ -219,7 +217,7 @@ public class  CombineFnsTest {
         .apply(Combine.perKey(CombineFns.composeKeyed()
             .with(
                 new GetIntegerFunction(),
-                new MaxIntegerFn().<String>asKeyedFn(),
+                Max.ofIntegers().<String>asKeyedFn(),
                 maxIntTag)
             .with(
                 new GetUserStringFunction(),
@@ -262,7 +260,7 @@ public class  CombineFnsTest {
         .apply(Combine.perKey(CombineFns.composeKeyed()
             .with(
                 new GetIntegerFunction(),
-                new MaxIntegerFn().<String>asKeyedFn(),
+                Max.ofIntegers().<String>asKeyedFn(),
                 maxIntTag)
             .with(
                 new GetUserStringFunction(),