You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/01/03 22:36:36 UTC

[1/7] beam git commit: [BEAM-1223] Reduced [Sum|Min|Max|Mean]*Fn viability so as to make them an internal implementation detail and prevent external code from employing them for tasks such as pipeline translation.

Repository: beam
Updated Branches:
  refs/heads/master e794f1486 -> d86db15ba


http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
index 19b7c51..4e80154 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.junit.Rule;
 import org.junit.Test;
@@ -59,7 +58,7 @@ public class DoFnTest implements Serializable {
   @Test
   public void testCreateAggregatorWithCombinerSucceeds() {
     String name = "testAggregator";
-    Sum.SumLongFn combiner = new Sum.SumLongFn();
+    Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
 
     DoFn<Void, Void> doFn = new NoOpDoFn();
 
@@ -76,7 +75,7 @@ public class DoFnTest implements Serializable {
 
     DoFn<Void, Void> doFn = new NoOpDoFn();
 
-    doFn.createAggregator(null, new Sum.SumLongFn());
+    doFn.createAggregator(null, Sum.ofLongs());
   }
 
   @Test
@@ -106,7 +105,7 @@ public class DoFnTest implements Serializable {
   @Test
   public void testCreateAggregatorWithSameNameThrowsException() {
     String name = "testAggregator";
-    CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
+    CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
 
     DoFn<Void, Void> doFn = new NoOpDoFn();
 
@@ -124,7 +123,7 @@ public class DoFnTest implements Serializable {
   public void testCreateAggregatorsWithDifferentNamesSucceeds() {
     String nameOne = "testAggregator";
     String nameTwo = "aggregatorPrime";
-    CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
+    CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
 
     DoFn<Void, Void> doFn = new NoOpDoFn();
 
@@ -151,7 +150,7 @@ public class DoFnTest implements Serializable {
     TestPipeline p = createTestPipeline(new DoFn<String, String>() {
       @StartBundle
       public void startBundle(Context c) {
-        createAggregator("anyAggregate", new MaxIntegerFn());
+        createAggregator("anyAggregate", Max.ofIntegers());
       }
 
       @ProcessElement
@@ -170,7 +169,7 @@ public class DoFnTest implements Serializable {
     TestPipeline p = createTestPipeline(new DoFn<String, String>() {
       @ProcessElement
       public void processElement(ProcessContext c) {
-        createAggregator("anyAggregate", new MaxIntegerFn());
+        createAggregator("anyAggregate", Max.ofIntegers());
       }
     });
 
@@ -186,7 +185,7 @@ public class DoFnTest implements Serializable {
     TestPipeline p = createTestPipeline(new DoFn<String, String>() {
       @FinishBundle
       public void finishBundle(Context c) {
-        createAggregator("anyAggregate", new MaxIntegerFn());
+        createAggregator("anyAggregate", Max.ofIntegers());
       }
 
       @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 3859c9f..699687f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -404,11 +404,11 @@ public class DoFnTesterTest {
    * {@link DoFn.ProcessElement @ProcessElement}.
    */
   private static class CounterDoFn extends DoFn<Long, String> {
-    Aggregator<Long, Long> agg = createAggregator("ctr", new Sum.SumLongFn());
+    Aggregator<Long, Long> agg = createAggregator("ctr", Sum.ofLongs());
     Aggregator<Long, Long> startBundleCalls =
-        createAggregator("startBundleCalls", new Sum.SumLongFn());
+        createAggregator("startBundleCalls", Sum.ofLongs());
     Aggregator<Long, Long> finishBundleCalls =
-        createAggregator("finishBundleCalls", new Sum.SumLongFn());
+        createAggregator("finishBundleCalls", Sum.ofLongs());
 
     private enum LifecycleState {
       UNINITIALIZED,

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
index 4aa39a3..2b43560 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
@@ -46,7 +46,7 @@ public class MaxTest {
   @Test
   public void testMaxIntegerFn() {
     checkCombineFn(
-        new Max.MaxIntegerFn(),
+        Max.ofIntegers(),
         Lists.newArrayList(1, 2, 3, 4),
         4);
   }
@@ -54,7 +54,7 @@ public class MaxTest {
   @Test
   public void testMaxLongFn() {
     checkCombineFn(
-        new Max.MaxLongFn(),
+        Max.ofLongs(),
         Lists.newArrayList(1L, 2L, 3L, 4L),
         4L);
   }
@@ -62,7 +62,7 @@ public class MaxTest {
   @Test
   public void testMaxDoubleFn() {
     checkCombineFn(
-        new Max.MaxDoubleFn(),
+        Max.ofDoubles(),
         Lists.newArrayList(1.0, 2.0, 3.0, 4.0),
         4.0);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
index 84741ee..79ebc25 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
@@ -65,7 +65,7 @@ public class MeanTest {
   @Test
   public void testMeanFn() throws Exception {
     checkCombineFn(
-        new Mean.MeanFn<Integer>(),
+        Mean.<Integer>of(),
         Lists.newArrayList(1, 2, 3, 4),
         2.5);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
index 4334ed9..e89b223 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
@@ -46,7 +46,7 @@ public class MinTest {
   @Test
   public void testMinIntegerFn() {
     checkCombineFn(
-        new Min.MinIntegerFn(),
+        Min.ofIntegers(),
         Lists.newArrayList(1, 2, 3, 4),
         1);
   }
@@ -54,7 +54,7 @@ public class MinTest {
   @Test
   public void testMinLongFn() {
     checkCombineFn(
-        new Min.MinLongFn(),
+        Min.ofLongs(),
         Lists.newArrayList(1L, 2L, 3L, 4L),
         1L);
   }
@@ -62,7 +62,7 @@ public class MinTest {
   @Test
   public void testMinDoubleFn() {
     checkCombineFn(
-        new Min.MinDoubleFn(),
+        Min.ofDoubles(),
         Lists.newArrayList(1.0, 2.0, 3.0, 4.0),
         1.0);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
index b2d4aed..b5cb286 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
@@ -55,7 +55,7 @@ public class OldDoFnContextTest {
 
   @Test
   public void testSetupDelegateAggregatorsCreatesAndLinksDelegateAggregators() {
-    Sum.SumLongFn combiner = new Sum.SumLongFn();
+    Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
     Aggregator<Long, Long> delegateAggregator =
         fn.createAggregator("test", combiner);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
index cc84252..1c767b1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertThat;
 import java.io.Serializable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
@@ -48,7 +47,7 @@ public class OldDoFnTest implements Serializable {
   @Test
   public void testCreateAggregatorWithCombinerSucceeds() {
     String name = "testAggregator";
-    Sum.SumLongFn combiner = new Sum.SumLongFn();
+    Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
 
     OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
 
@@ -65,7 +64,7 @@ public class OldDoFnTest implements Serializable {
 
     OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
 
-    doFn.createAggregator(null, new Sum.SumLongFn());
+    doFn.createAggregator(null, Sum.ofLongs());
   }
 
   @Test
@@ -95,7 +94,7 @@ public class OldDoFnTest implements Serializable {
   @Test
   public void testCreateAggregatorWithSameNameThrowsException() {
     String name = "testAggregator";
-    CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
+    CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
 
     OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
 
@@ -113,7 +112,7 @@ public class OldDoFnTest implements Serializable {
   public void testCreateAggregatorsWithDifferentNamesSucceeds() {
     String nameOne = "testAggregator";
     String nameTwo = "aggregatorPrime";
-    CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
+    CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
 
     OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
 
@@ -135,7 +134,7 @@ public class OldDoFnTest implements Serializable {
     context.setupDelegateAggregators();
 
     thrown.expect(isA(IllegalStateException.class));
-    fn.createAggregator("anyAggregate", new MaxIntegerFn());
+    fn.createAggregator("anyAggregate", Max.ofIntegers());
   }
 
   private OldDoFn<String, String>.Context createContext(OldDoFn<String, String> fn) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleStatsFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleStatsFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleStatsFnsTest.java
index a782ecc..2fd86c6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleStatsFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleStatsFnsTest.java
@@ -78,31 +78,31 @@ public class SimpleStatsFnsTest {
 
   @Test
   public void testInstantStats() {
-    assertEquals(new Instant(1000), Min.MinFn.<Instant>naturalOrder().apply(
+    assertEquals(new Instant(1000), Min.<Instant>naturalOrder().apply(
         Arrays.asList(new Instant(1000), new Instant(2000))));
-    assertEquals(null, Min.MinFn.<Instant>naturalOrder().apply(
+    assertEquals(null, Min.<Instant>naturalOrder().apply(
         Collections.<Instant>emptyList()));
-    assertEquals(new Instant(5000), Min.MinFn.<Instant>naturalOrder(new Instant(5000)).apply(
+    assertEquals(new Instant(5000), Min.<Instant>naturalOrder(new Instant(5000)).apply(
         Collections.<Instant>emptyList()));
 
-    assertEquals(new Instant(2000), Max.MaxFn.<Instant>naturalOrder().apply(
+    assertEquals(new Instant(2000), Max.<Instant>naturalOrder().apply(
         Arrays.asList(new Instant(1000), new Instant(2000))));
-    assertEquals(null, Max.MaxFn.<Instant>naturalOrder().apply(
+    assertEquals(null, Max.<Instant>naturalOrder().apply(
         Collections.<Instant>emptyList()));
-    assertEquals(new Instant(5000), Max.MaxFn.<Instant>naturalOrder(new Instant(5000)).apply(
+    assertEquals(new Instant(5000), Max.<Instant>naturalOrder(new Instant(5000)).apply(
         Collections.<Instant>emptyList()));
   }
 
   @Test
   public void testDoubleStats() {
     for (TestCase<Double> t : DOUBLE_CASES) {
-      assertEquals(t.sum, new Sum.SumDoubleFn().apply(t.data),
+      assertEquals(t.sum, Sum.ofDoubles().apply(t.data),
           DOUBLE_COMPARISON_ACCURACY);
-      assertEquals(t.min, new Min.MinDoubleFn().apply(t.data),
+      assertEquals(t.min, Min.ofDoubles().apply(t.data),
           DOUBLE_COMPARISON_ACCURACY);
-      assertEquals(t.max, new Max.MaxDoubleFn().apply(t.data),
+      assertEquals(t.max, Max.ofDoubles().apply(t.data),
           DOUBLE_COMPARISON_ACCURACY);
-      assertEquals(t.mean, new Mean.MeanFn<Double>().apply(t.data),
+      assertEquals(t.mean, Mean.<Double>of().apply(t.data),
           DOUBLE_COMPARISON_ACCURACY);
     }
   }
@@ -110,20 +110,20 @@ public class SimpleStatsFnsTest {
   @Test
   public void testIntegerStats() {
     for (TestCase<Integer> t : INTEGER_CASES) {
-      assertEquals(t.sum, new Sum.SumIntegerFn().apply(t.data));
-      assertEquals(t.min, new Min.MinIntegerFn().apply(t.data));
-      assertEquals(t.max, new Max.MaxIntegerFn().apply(t.data));
-      assertEquals(t.mean, new Mean.MeanFn<Integer>().apply(t.data));
+      assertEquals(t.sum, Sum.ofIntegers().apply(t.data));
+      assertEquals(t.min, Min.ofIntegers().apply(t.data));
+      assertEquals(t.max, Max.ofIntegers().apply(t.data));
+      assertEquals(t.mean, Mean.<Integer>of().apply(t.data));
     }
   }
 
   @Test
   public void testLongStats() {
     for (TestCase<Long> t : LONG_CASES) {
-      assertEquals(t.sum, new Sum.SumLongFn().apply(t.data));
-      assertEquals(t.min, new Min.MinLongFn().apply(t.data));
-      assertEquals(t.max, new Max.MaxLongFn().apply(t.data));
-      assertEquals(t.mean, new Mean.MeanFn<Long>().apply(t.data));
+      assertEquals(t.sum, Sum.ofLongs().apply(t.data));
+      assertEquals(t.min, Min.ofLongs().apply(t.data));
+      assertEquals(t.max, Max.ofLongs().apply(t.data));
+      assertEquals(t.mean, Mean.<Long>of().apply(t.data));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
index 04c0186..b2f8aa8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
@@ -52,7 +52,7 @@ public class SumTest {
   @Test
   public void testSumIntegerFn() {
     checkCombineFn(
-        new Sum.SumIntegerFn(),
+        Sum.ofIntegers(),
         Lists.newArrayList(1, 2, 3, 4),
         10);
   }
@@ -60,7 +60,7 @@ public class SumTest {
   @Test
   public void testSumLongFn() {
     checkCombineFn(
-        new Sum.SumLongFn(),
+        Sum.ofLongs(),
         Lists.newArrayList(1L, 2L, 3L, 4L),
         10L);
   }
@@ -68,14 +68,14 @@ public class SumTest {
   @Test
   public void testSumDoubleFn() {
     checkCombineFn(
-        new Sum.SumDoubleFn(),
+        Sum.ofDoubles(),
         Lists.newArrayList(1.0, 2.0, 3.0, 4.0),
         10.0);
   }
 
   @Test
   public void testGetAccumulatorCoderEquals() {
-    Sum.SumIntegerFn sumIntegerFn = new Sum.SumIntegerFn();
+    Combine.BinaryCombineIntegerFn sumIntegerFn = Sum.ofIntegers();
     assertEquals(
         sumIntegerFn.getAccumulatorCoder(STANDARD_REGISTRY, VarIntCoder.of()),
         sumIntegerFn.getAccumulatorCoder(STANDARD_REGISTRY, VarIntCoder.of()));
@@ -83,7 +83,7 @@ public class SumTest {
         sumIntegerFn.getAccumulatorCoder(STANDARD_REGISTRY, VarIntCoder.of()),
         sumIntegerFn.getAccumulatorCoder(STANDARD_REGISTRY, BigEndianIntegerCoder.of()));
 
-    Sum.SumLongFn sumLongFn = new Sum.SumLongFn();
+    Combine.BinaryCombineLongFn sumLongFn = Sum.ofLongs();
     assertEquals(
         sumLongFn.getAccumulatorCoder(STANDARD_REGISTRY, VarLongCoder.of()),
         sumLongFn.getAccumulatorCoder(STANDARD_REGISTRY, VarLongCoder.of()));
@@ -91,7 +91,7 @@ public class SumTest {
         sumLongFn.getAccumulatorCoder(STANDARD_REGISTRY, VarLongCoder.of()),
         sumLongFn.getAccumulatorCoder(STANDARD_REGISTRY, BigEndianLongCoder.of()));
 
-    Sum.SumDoubleFn sumDoubleFn = new Sum.SumDoubleFn();
+    Combine.BinaryCombineDoubleFn sumDoubleFn = Sum.ofDoubles();
     assertEquals(
         sumDoubleFn.getAccumulatorCoder(STANDARD_REGISTRY, DoubleCoder.of()),
         sumDoubleFn.getAccumulatorCoder(STANDARD_REGISTRY, DoubleCoder.of()));

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index 1d8b32c..0249ac9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -1120,7 +1120,7 @@ public class ViewTest implements Serializable {
 
     final PCollectionView<Map<String, Integer>> view =
         pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("a", 20), KV.of("b", 3)))
-            .apply("SumIntegers", Combine.perKey(new Sum.SumIntegerFn().<String>asKeyedFn()))
+            .apply("SumIntegers", Combine.perKey(Sum.ofIntegers().<String>asKeyedFn()))
             .apply(View.<String, Integer>asMap());
 
     PCollection<KV<String, Integer>> output =

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
index fe81275..36a90e9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
@@ -70,18 +70,18 @@ public class CombineFnUtilTest {
   @Test
   public void testToFnWithContextIdempotent() throws Exception {
     CombineFnWithContext<Integer, int[], Integer> fnWithContext =
-        CombineFnUtil.toFnWithContext(new Sum.SumIntegerFn());
+        CombineFnUtil.toFnWithContext(Sum.ofIntegers());
     assertTrue(fnWithContext == CombineFnUtil.toFnWithContext(fnWithContext));
 
     KeyedCombineFnWithContext<Object, Integer, int[], Integer> keyedFnWithContext =
-        CombineFnUtil.toFnWithContext(new Sum.SumIntegerFn().asKeyedFn());
+        CombineFnUtil.toFnWithContext(Sum.ofIntegers().asKeyedFn());
     assertTrue(keyedFnWithContext == CombineFnUtil.toFnWithContext(keyedFnWithContext));
   }
 
   @Test
   public void testToFnWithContext() throws Exception {
     CombineFnWithContext<Integer, int[], Integer> fnWithContext =
-        CombineFnUtil.toFnWithContext(new Sum.SumIntegerFn());
+        CombineFnUtil.toFnWithContext(Sum.ofIntegers());
     List<Integer> inputs = ImmutableList.of(1, 2, 3, 4);
     Context nullContext = CombineContextFactory.nullContext();
     int[] accum = fnWithContext.createAccumulator(nullContext);
@@ -91,7 +91,7 @@ public class CombineFnUtilTest {
     assertEquals(10, fnWithContext.extractOutput(accum, nullContext).intValue());
 
     KeyedCombineFnWithContext<String, Integer, int[], Integer> keyedFnWithContext =
-        CombineFnUtil.toFnWithContext(new Sum.SumIntegerFn().<String>asKeyedFn());
+        CombineFnUtil.toFnWithContext(Sum.ofIntegers().<String>asKeyedFn());
     String key = "key";
     accum = keyedFnWithContext.createAccumulator(key, nullContext);
     for (Integer i : inputs) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryStateInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryStateInternalsTest.java
index 08a6bc1..e43ad36 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryStateInternalsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryStateInternalsTest.java
@@ -48,7 +48,7 @@ public class InMemoryStateInternalsTest {
       StateTags.value("stringValue", StringUtf8Coder.of());
   private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
-          "sumInteger", VarIntCoder.of(), new Sum.SumIntegerFn());
+          "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
   private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
   private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java
index 2c8c9cc..7c06dbd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java
@@ -24,10 +24,9 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
 import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.Min.MinIntegerFn;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.junit.Test;
@@ -86,10 +85,10 @@ public class StateTagTest {
   @SuppressWarnings({"unchecked", "rawtypes"})
   @Test
   public void testCombiningValueEquality() {
-    MaxIntegerFn maxFn = new Max.MaxIntegerFn();
+    Combine.BinaryCombineIntegerFn maxFn = Max.ofIntegers();
     Coder<Integer> input1 = VarIntCoder.of();
     Coder<Integer> input2 = BigEndianIntegerCoder.of();
-    MinIntegerFn minFn = new Min.MinIntegerFn();
+    Combine.BinaryCombineIntegerFn minFn = Min.ofIntegers();
 
     StateTag<?, ?> fooCoder1Max1 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
     StateTag<?, ?> fooCoder1Max2 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
@@ -129,8 +128,8 @@ public class StateTagTest {
     CoderRegistry registry = new CoderRegistry();
     registry.registerStandardCoders();
 
-    MaxIntegerFn maxFn = new Max.MaxIntegerFn();
-    MinIntegerFn minFn = new Min.MinIntegerFn();
+    Combine.BinaryCombineIntegerFn maxFn = Max.ofIntegers();
+    Combine.BinaryCombineIntegerFn minFn = Min.ofIntegers();
 
     Coder<int[]> accum1 = maxFn.getAccumulatorCoder(registry, VarIntCoder.of());
     Coder<int[]> accum2 = minFn.getAccumulatorCoder(registry, BigEndianIntegerCoder.of());

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 3a36add..7efa115 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -2577,7 +2577,7 @@ public class BigQueryIO {
 
     /** Tracks bytes written, exposed as "ByteCount" Counter. */
     private Aggregator<Long, Long> byteCountAggregator =
-        createAggregator("ByteCount", new Sum.SumLongFn());
+        createAggregator("ByteCount", Sum.ofLongs());
 
     /** Constructor. */
     StreamingWriteFn(ValueProvider<TableSchema> schema, BigQueryServices bqServices) {


[3/7] beam git commit: [BEAM-1223] Resolved some type-safety related warnings.

Posted by lc...@apache.org.
[BEAM-1223] Resolved some type-safety related warnings.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6b68699c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6b68699c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6b68699c

Branch: refs/heads/master
Commit: 6b68699cd75e8b17bc84cf58053fe1f7583e447f
Parents: 78a360e
Author: Stas Levin <st...@gmail.com>
Authored: Mon Jan 2 18:29:21 2017 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 14:28:19 2017 -0800

----------------------------------------------------------------------
 .../core/src/main/java/org/apache/beam/sdk/transforms/Max.java   | 4 ++--
 .../core/src/main/java/org/apache/beam/sdk/transforms/Min.java   | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6b68699c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index 696bed9..91851bc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -195,7 +195,7 @@ public class Max {
    */
   public static <T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.Globally<T, T> globally(ComparatorT comparator) {
-    return Combine.<T, T>globally(Max.of(comparator));
+    return Combine.<T, T>globally(Max.<T, ComparatorT>of(comparator));
   }
 
   /**
@@ -207,7 +207,7 @@ public class Max {
    */
   public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
-    return Combine.<K, T, T>perKey(Max.of(comparator));
+    return Combine.<K, T, T>perKey(Max.<T, ComparatorT>of(comparator));
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/6b68699c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index b208929..3518a35 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -195,7 +195,7 @@ public class Min {
    */
   public static <T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.Globally<T, T> globally(ComparatorT comparator) {
-    return Combine.<T, T>globally(Min.of(comparator));
+    return Combine.<T, T>globally(Min.<T, ComparatorT>of(comparator));
   }
 
   /**
@@ -207,7 +207,7 @@ public class Min {
    */
   public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
-    return Combine.<K, T, T>perKey(Min.of(comparator));
+    return Combine.<K, T, T>perKey(Min.<T, ComparatorT>of(comparator));
   }
 
   /////////////////////////////////////////////////////////////////////////////


[2/7] beam git commit: [BEAM-1223] Reduced [Sum|Min|Max|Mean]*Fn viability so as to make them an internal implementation detail and prevent external code from employing them for tasks such as pipeline translation.

Posted by lc...@apache.org.
[BEAM-1223] Reduced [Sum|Min|Max|Mean]*Fn viability so as to make them an internal implementation detail and prevent external code from employing them for tasks such as pipeline translation.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/78a360ea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/78a360ea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/78a360ea

Branch: refs/heads/master
Commit: 78a360eac35507d9a558fc6117bb56b67b8a884e
Parents: e794f14
Author: Stas Levin <st...@gmail.com>
Authored: Sun Jan 1 13:58:32 2017 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 14:25:26 2017 -0800

----------------------------------------------------------------------
 .../beam/examples/DebuggingWordCount.java       |   4 +-
 .../org/apache/beam/examples/WordCount.java     |   2 +-
 .../cookbook/CombinePerKeyExamples.java         |   2 +-
 .../beam/examples/complete/game/GameStats.java  |   2 +-
 .../beam/examples/complete/game/UserScore.java  |   2 +-
 .../runners/apex/examples/WordCountTest.java    |   2 +-
 .../utils/ApexStateInternalsTest.java           |   2 +-
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   4 +-
 .../runners/core/GroupAlsoByWindowsDoFn.java    |   4 +-
 .../apache/beam/runners/core/NonEmptyPanes.java |   2 +-
 .../AfterDelayFromFirstElementStateMachine.java |   2 +-
 .../core/triggers/AfterPaneStateMachine.java    |   2 +-
 .../core/LateDataDroppingDoFnRunnerTest.java    |   2 +-
 .../beam/runners/core/ReduceFnRunnerTest.java   |  12 +-
 .../beam/runners/core/ReduceFnTester.java       |   2 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   4 +-
 .../runners/direct/AggregatorContainerTest.java |  16 +--
 .../CopyOnAccessInMemoryStateInternalsTest.java |   4 +-
 .../runners/direct/EvaluationContextTest.java   |   6 +-
 .../beam/runners/flink/examples/WordCount.java  |   2 +-
 .../flink/examples/streaming/AutoComplete.java  |   2 +-
 .../KafkaWindowedWordCountExample.java          |   2 +-
 .../examples/streaming/WindowedWordCount.java   |   2 +-
 .../streaming/FlinkStateInternalsTest.java      |   2 +-
 .../dataflow/DataflowPipelineJobTest.java       |   8 +-
 .../spark/aggregators/NamedAggregators.java     |   4 +-
 .../beam/runners/spark/examples/WordCount.java  |   2 +-
 .../translation/SparkGroupAlsoByWindowFn.java   |   2 +-
 .../spark/translation/SparkRuntimeContext.java  |  63 ++++-------
 .../ResumeFromCheckpointStreamingTest.java      |   2 +-
 .../streaming/utils/PAssertStreaming.java       |   4 +-
 .../apache/beam/sdk/io/PubsubUnboundedSink.java |   8 +-
 .../beam/sdk/io/PubsubUnboundedSource.java      |   5 +-
 .../org/apache/beam/sdk/testing/PAssert.java    |  12 +-
 .../org/apache/beam/sdk/transforms/Max.java     | 112 +++++++++++--------
 .../org/apache/beam/sdk/transforms/Mean.java    |  13 ++-
 .../org/apache/beam/sdk/transforms/Min.java     | 110 ++++++++++--------
 .../org/apache/beam/sdk/transforms/Sum.java     |  57 ++++++----
 .../windowing/AfterDelayFromFirstElement.java   |   2 +-
 .../sdk/transforms/windowing/AfterPane.java     |   2 +-
 .../sdk/AggregatorPipelineExtractorTest.java    |  16 +--
 .../beam/sdk/transforms/CombineFnsTest.java     |  20 ++--
 .../apache/beam/sdk/transforms/DoFnTest.java    |  15 ++-
 .../beam/sdk/transforms/DoFnTesterTest.java     |   6 +-
 .../org/apache/beam/sdk/transforms/MaxTest.java |   6 +-
 .../apache/beam/sdk/transforms/MeanTest.java    |   2 +-
 .../org/apache/beam/sdk/transforms/MinTest.java |   6 +-
 .../beam/sdk/transforms/OldDoFnContextTest.java |   2 +-
 .../apache/beam/sdk/transforms/OldDoFnTest.java |  11 +-
 .../beam/sdk/transforms/SimpleStatsFnsTest.java |  36 +++---
 .../org/apache/beam/sdk/transforms/SumTest.java |  12 +-
 .../apache/beam/sdk/transforms/ViewTest.java    |   2 +-
 .../apache/beam/sdk/util/CombineFnUtilTest.java |   8 +-
 .../util/state/InMemoryStateInternalsTest.java  |   2 +-
 .../beam/sdk/util/state/StateTagTest.java       |  11 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |   2 +-
 56 files changed, 339 insertions(+), 310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index f7c537c..997d590 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -95,9 +95,9 @@ public class DebuggingWordCount {
      * in a dashboard, etc.
      */
     private final Aggregator<Long, Long> matchedWords =
-        createAggregator("matchedWords", new Sum.SumLongFn());
+        createAggregator("matchedWords", Sum.ofLongs());
     private final Aggregator<Long, Long> unmatchedWords =
-        createAggregator("umatchedWords", new Sum.SumLongFn());
+        createAggregator("umatchedWords", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index 224d7db..7e21d47 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -87,7 +87,7 @@ public class WordCount {
    */
   static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
+        createAggregator("emptyLines", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 29655ea..37f9d79 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -80,7 +80,7 @@ public class CombinePerKeyExamples {
    */
   static class ExtractLargeWordsFn extends DoFn<TableRow, KV<String, String>> {
     private final Aggregator<Long, Long> smallerWords =
-        createAggregator("smallerWords", new Sum.SumLongFn());
+        createAggregator("smallerWords", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c){

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 6ad6a23..74f1b30 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -127,7 +127,7 @@ public class GameStats extends LeaderBoard {
               .withSideInputs(globalMeanScore)
               .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
                 private final Aggregator<Long, Long> numSpammerUsers =
-                  createAggregator("SpammerUsers", new Sum.SumLongFn());
+                  createAggregator("SpammerUsers", Sum.ofLongs());
                 @ProcessElement
                 public void processElement(ProcessContext c) {
                   Integer score = c.element().getValue();

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index cb81a7e..7dd5a8e 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -126,7 +126,7 @@ public class UserScore {
     // Log and count parse errors.
     private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class);
     private final Aggregator<Long, Long> numParseErrors =
-        createAggregator("ParseErrors", new Sum.SumLongFn());
+        createAggregator("ParseErrors", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index 28bb8ad..a1713ac 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -68,7 +68,7 @@ public class WordCountTest {
   static class ExtractWordsFn extends DoFn<String, String> {
     private static final long serialVersionUID = 1L;
     private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
+        createAggregator("emptyLines", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 1801358..4d797f1 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -62,7 +62,7 @@ public class ApexStateInternalsTest {
       StateTags.value("stringValue", StringUtf8Coder.of());
   private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
-          "sumInteger", VarIntCoder.of(), new Sum.SumIntegerFn());
+          "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
   private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
   private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 14171b3..d79683a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -50,9 +50,9 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
 
   protected final Aggregator<Long, Long> droppedDueToClosedWindow =
       createAggregator(
-          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
+          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
   protected final Aggregator<Long, Long> droppedDueToLateness =
-      createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
+      createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
 
   private final WindowingStrategy<Object, W> windowingStrategy;
   private final StateInternalsFactory<K> stateInternalsFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
index 1b32d84..9a2f8fd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
@@ -41,7 +41,7 @@ public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends Bound
   public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
 
   protected final Aggregator<Long, Long> droppedDueToClosedWindow =
-      createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
+      createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
   protected final Aggregator<Long, Long> droppedDueToLateness =
-      createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
+      createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
index 3e51dfb..0a6fd93 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
@@ -121,7 +121,7 @@ public abstract class NonEmptyPanes<K, W extends BoundedWindow> {
     private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
         PANE_ADDITIONS_TAG =
         StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
-            "count", VarLongCoder.of(), new Sum.SumLongFn()));
+            "count", VarLongCoder.of(), Sum.ofLongs()));
 
     @Override
     public void recordContent(StateAccessor<K> state) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index c8922df..b60c690 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -56,7 +56,7 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
   protected static final StateTag<Object, AccumulatorCombiningState<Instant,
                                               Combine.Holder<Instant>, Instant>> DELAYED_UNTIL_TAG =
       StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
-          "delayed", InstantCoder.of(), Min.MinFn.<Instant>naturalOrder()));
+          "delayed", InstantCoder.of(), Min.<Instant>naturalOrder()));
 
   private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
index 38b95f9..d8ad370 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
@@ -38,7 +38,7 @@ public class AfterPaneStateMachine extends OnceTriggerStateMachine {
 private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
       ELEMENTS_IN_PANE_TAG =
       StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
-          "count", VarLongCoder.of(), new Sum.SumLongFn()));
+          "count", VarLongCoder.of(), Sum.ofLongs()));
 
   private final int countElems;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
index 3cd5d4a..efe2044 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
@@ -114,7 +114,7 @@ public class LateDataDroppingDoFnRunnerTest {
 
     @Override
     public CombineFn<Long, ?, Long> getCombineFn() {
-      return new Sum.SumLongFn();
+      return Sum.ofLongs();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 4abfc9a..1bd717f 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -217,7 +217,7 @@ public class ReduceFnRunnerTest {
         ReduceFnTester.combining(
             strategy,
             mockTriggerStateMachine,
-            new Sum.SumIntegerFn().<String>asKeyedFn(),
+            Sum.ofIntegers().<String>asKeyedFn(),
             VarIntCoder.of());
 
     injectElement(tester, 2);
@@ -290,7 +290,7 @@ public class ReduceFnRunnerTest {
 
     ReduceFnTester<Integer, Integer, IntervalWindow> tester =
         ReduceFnTester
-            .combining(strategy, new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
+            .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
 
     tester.injectElements(TimestampedValue.of(13, elementTimestamp));
 
@@ -322,7 +322,7 @@ public class ReduceFnRunnerTest {
         ReduceFnTester.combining(
             strategy,
             mockTriggerStateMachine,
-            new Sum.SumIntegerFn().<String>asKeyedFn(),
+            Sum.ofIntegers().<String>asKeyedFn(),
             VarIntCoder.of());
 
     injectElement(tester, 1);
@@ -1069,7 +1069,7 @@ public class ReduceFnRunnerTest {
             SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30)))
         .withTrigger(AfterWatermark.pastEndOfWindow())
         .withAllowedLateness(Duration.millis(1000)),
-        new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
+        Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
 
     tester.injectElements(
         // assigned to [-60, 40), [-30, 70), [0, 100)
@@ -1212,7 +1212,7 @@ public class ReduceFnRunnerTest {
 
     ReduceFnTester<Integer, Integer, IntervalWindow> tester =
         ReduceFnTester
-            .combining(strategy, new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
+            .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
 
     tester.advanceInputWatermark(new Instant(0));
     tester.advanceProcessingTime(new Instant(0));
@@ -1268,7 +1268,7 @@ public class ReduceFnRunnerTest {
 
     ReduceFnTester<Integer, Integer, IntervalWindow> tester =
         ReduceFnTester
-            .combining(strategy, new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
+            .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
 
     tester.advanceInputWatermark(new Instant(0));
     tester.advanceProcessingTime(new Instant(0));

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 890195a..226f5f0 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -638,7 +638,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
     @Override
     public CombineFn<Long, ?, Long> getCombineFn() {
-      return new Sum.SumLongFn();
+      return Sum.ofLongs();
     }
 
     public long getSum() {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index bb11923..2bc0d8d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -147,10 +147,10 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
       reduceFn = SystemReduceFn.buffering(valueCoder);
       droppedDueToClosedWindow = aggregatorChanges.createSystemAggregator(stepContext,
           GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
-          new Sum.SumLongFn());
+          Sum.ofLongs());
       droppedDueToLateness = aggregatorChanges.createSystemAggregator(stepContext,
           GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER,
-          new Sum.SumLongFn());
+          Sum.ofLongs());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
index f770800..37524eb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
@@ -26,7 +26,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.core.ExecutionContext.StepContext;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum.SumIntegerFn;
+import org.apache.beam.sdk.transforms.Sum;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -61,13 +61,13 @@ public class AggregatorContainerTest {
   @Test
   public void addsAggregatorsOnCommit() {
     AggregatorContainer.Mutator mutator = container.createMutator();
-    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn()).addValue(5);
+    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
     mutator.commit();
 
     assertThat((Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(5));
 
     mutator = container.createMutator();
-    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn()).addValue(8);
+    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(8);
 
     assertThat("Shouldn't update value until commit",
         (Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(5));
@@ -81,14 +81,14 @@ public class AggregatorContainerTest {
     mutator.commit();
 
     thrown.expect(IllegalStateException.class);
-    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn()).addValue(5);
+    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
   }
 
   @Test
   public void failToAddValueAfterCommit() {
     AggregatorContainer.Mutator mutator = container.createMutator();
     Aggregator<Integer, ?> aggregator =
-        mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn());
+        mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers());
     mutator.commit();
 
     thrown.expect(IllegalStateException.class);
@@ -99,12 +99,12 @@ public class AggregatorContainerTest {
   public void failToAddValueAfterCommitWithPrevious() {
     AggregatorContainer.Mutator mutator = container.createMutator();
     mutator.createAggregatorForDoFn(
-        fn, stepContext, "sum_int", new SumIntegerFn()).addValue(5);
+        fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
     mutator.commit();
 
     mutator = container.createMutator();
     Aggregator<Integer, ?> aggregator = mutator.createAggregatorForDoFn(
-        fn, stepContext, "sum_int", new SumIntegerFn());
+        fn, stepContext, "sum_int", Sum.ofIntegers());
     mutator.commit();
 
     thrown.expect(IllegalStateException.class);
@@ -123,7 +123,7 @@ public class AggregatorContainerTest {
         @Override
         public void run() {
           mutator.createAggregatorForDoFn(
-              fn, stepContext, "sum_int", new SumIntegerFn()).addValue(value);
+              fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(value);
           mutator.commit();
         }
       });

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 12ef66c..4284f3e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -167,7 +167,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
   public void testAccumulatorCombiningStateWithUnderlying() throws CannotProvideCoderException {
     CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CombineFn<Long, long[], Long> sumLongFn = new Sum.SumLongFn();
+    CombineFn<Long, long[], Long> sumLongFn = Sum.ofLongs();
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
     CoderRegistry reg = pipeline.getCoderRegistry();
@@ -197,7 +197,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
   public void testKeyedAccumulatorCombiningStateWithUnderlying() throws Exception {
     CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    KeyedCombineFn<String, Long, long[], Long> sumLongFn = new Sum.SumLongFn().asKeyedFn();
+    KeyedCombineFn<String, Long, long[], Long> sumLongFn = Sum.ofLongs().asKeyedFn();
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
     CoderRegistry reg = pipeline.getCoderRegistry();

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 15340da..ad6e32d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -45,7 +45,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Sum.SumLongFn;
+import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -250,7 +250,7 @@ public class EvaluationContextTest {
         "STEP", createdProducer.getTransform().getName());
     AggregatorContainer container = context.getAggregatorContainer();
     AggregatorContainer.Mutator mutator = container.createMutator();
-    mutator.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(4L);
+    mutator.createAggregatorForDoFn(fn, stepContext, "foo", Sum.ofLongs()).addValue(4L);
 
     TransformResult<?> result =
         StepTransformResult.withoutHold(createdProducer)
@@ -260,7 +260,7 @@ public class EvaluationContextTest {
     assertThat((Long) context.getAggregatorContainer().getAggregate("STEP", "foo"), equalTo(4L));
 
     AggregatorContainer.Mutator mutatorAgain = container.createMutator();
-    mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(12L);
+    mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", Sum.ofLongs()).addValue(12L);
 
     TransformResult<?> secondResult =
         StepTransformResult.withoutHold(downstreamProducer)

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index b6b3c1a..6ae4cf8 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -46,7 +46,7 @@ public class WordCount {
    */
   public static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
+        createAggregator("emptyLines", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index 3405981..f33e616 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -314,7 +314,7 @@ public class AutoComplete {
 
   static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
-            createAggregator("emptyLines", new Sum.SumLongFn());
+            createAggregator("emptyLines", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index 42c42f3..ee0e874 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -55,7 +55,7 @@ public class KafkaWindowedWordCountExample {
    */
   public static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
+        createAggregator("emptyLines", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
index 2246bdd..792c214 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
@@ -66,7 +66,7 @@ public class WindowedWordCount {
 
   static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
+        createAggregator("emptyLines", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 628212a..126f611 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -70,7 +70,7 @@ public class FlinkStateInternalsTest {
       StateTags.value("stringValue", StringUtf8Coder.of());
   private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
-          "sumInteger", VarIntCoder.of(), new Sum.SumIntegerFn());
+          "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
   private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
   private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 1890da1..6999e03 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -420,7 +420,7 @@ public class DataflowPipelineJobTest {
   @Test
   public void testGetAggregatorValuesWithSingleMetricUpdateReturnsSingletonCollection()
       throws IOException, AggregatorRetrievalException {
-    CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+    CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
     String aggregatorName = "agg";
     Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
     @SuppressWarnings("unchecked")
@@ -468,7 +468,7 @@ public class DataflowPipelineJobTest {
   @Test
   public void testGetAggregatorValuesWithMultipleMetricUpdatesReturnsCollection()
       throws IOException, AggregatorRetrievalException {
-    CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+    CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
     String aggregatorName = "agg";
     Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
 
@@ -536,7 +536,7 @@ public class DataflowPipelineJobTest {
   @Test
   public void testGetAggregatorValuesWithUnrelatedMetricUpdateIgnoresUpdate()
       throws IOException, AggregatorRetrievalException {
-    CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+    CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
     String aggregatorName = "agg";
     Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
     @SuppressWarnings("unchecked")
@@ -600,7 +600,7 @@ public class DataflowPipelineJobTest {
   @Test
   public void testGetAggregatorValuesWhenClientThrowsExceptionThrowsAggregatorRetrievalException()
       throws IOException, AggregatorRetrievalException {
-    CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+    CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
     String aggregatorName = "agg";
     Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index 4e96466..52fe994 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -153,7 +153,9 @@ public class NamedAggregators implements Serializable {
   }
 
   /**
-   * =&gt; combineFunction in data flow.
+   * @param <InputT>    Input data type
+   * @param <InterT> Intermediate data type (useful for averages)
+   * @param <OutputT>   Output data type
    */
   public static class CombineFunctionState<InputT, InterT, OutputT>
       implements State<InputT, InterT, OutputT> {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index 1252d12..da14ee2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -46,7 +46,7 @@ public class WordCount {
    */
   static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
+        createAggregator("emptyLines", Sum.ofLongs());
 
     @ProcessElement
     public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
index 5432d58..b615132 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
@@ -78,7 +78,7 @@ public class SparkGroupAlsoByWindowFn<K, InputT, W extends BoundedWindow>
     droppedDueToClosedWindow = runtimeContext.createAggregator(
         accumulator,
         GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
-        new Sum.SumLongFn());
+        Sum.ofLongs());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 01b6b54..9c3d79f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -32,10 +32,6 @@ import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.spark.Accumulator;
 
 /**
@@ -92,18 +88,26 @@ public class SparkRuntimeContext implements Serializable {
       Combine.CombineFn<? super InputT, InterT, OutputT> combineFn) {
     @SuppressWarnings("unchecked")
     Aggregator<InputT, OutputT> aggregator = (Aggregator<InputT, OutputT>) aggregators.get(named);
-    if (aggregator == null) {
-      @SuppressWarnings("unchecked")
-      NamedAggregators.CombineFunctionState<InputT, InterT, OutputT> state =
-          new NamedAggregators.CombineFunctionState<>(
-              (Combine.CombineFn<InputT, InterT, OutputT>) combineFn,
-              (Coder<InputT>) getCoder(combineFn),
-              this);
-      accum.add(new NamedAggregators(named, state));
-      aggregator = new SparkAggregator<>(named, state);
-      aggregators.put(named, aggregator);
+    try {
+      if (aggregator == null) {
+        @SuppressWarnings("unchecked")
+        final
+        NamedAggregators.CombineFunctionState<InputT, InterT, OutputT> state =
+            new NamedAggregators.CombineFunctionState<>(
+                (Combine.CombineFn<InputT, InterT, OutputT>) combineFn,
+                // hidden assumption: InputT == OutputT
+                (Coder<InputT>) getCoderRegistry().getCoder(combineFn.getOutputType()),
+                this);
+
+        accum.add(new NamedAggregators(named, state));
+        aggregator = new SparkAggregator<>(named, state);
+        aggregators.put(named, aggregator);
+      }
+      return aggregator;
+    } catch (CannotProvideCoderException e) {
+      throw new RuntimeException(String.format("Unable to create an aggregator named: [%s]", named),
+                                 e);
     }
-    return aggregator;
   }
 
   public CoderRegistry getCoderRegistry() {
@@ -114,35 +118,6 @@ public class SparkRuntimeContext implements Serializable {
     return coderRegistry;
   }
 
-  private Coder<?> getCoder(Combine.CombineFn<?, ?, ?> combiner) {
-    try {
-      if (combiner.getClass() == Sum.SumIntegerFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
-      } else if (combiner.getClass() == Sum.SumLongFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
-      } else if (combiner.getClass() == Sum.SumDoubleFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
-      } else if (combiner.getClass() == Min.MinIntegerFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
-      } else if (combiner.getClass() == Min.MinLongFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
-      } else if (combiner.getClass() == Min.MinDoubleFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
-      } else if (combiner.getClass() == Max.MaxIntegerFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
-      } else if (combiner.getClass() == Max.MaxLongFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
-      } else if (combiner.getClass() == Max.MaxDoubleFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
-      } else {
-        throw new IllegalArgumentException("unsupported combiner in Aggregator: "
-            + combiner.getClass().getName());
-      }
-    } catch (CannotProvideCoderException e) {
-      throw new IllegalStateException("Could not determine default coder for combiner", e);
-    }
-  }
-
   /**
    * Initialize spark aggregators exactly once.
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 2718b5f..ab04c5c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -182,7 +182,7 @@ public class ResumeFromCheckpointStreamingTest {
   private static class FormatAsText extends DoFn<KV<String, String>, String> {
 
     private final Aggregator<Long, Long> aggregator =
-        createAggregator("processedMessages", new Sum.SumLongFn());
+        createAggregator("processedMessages", Sum.ofLongs());
 
     @ProcessElement
     public void process(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
index 0284b3d..cd9de92 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
@@ -97,9 +97,9 @@ public final class PAssertStreaming implements Serializable {
 
   private static class AssertDoFn<T> extends DoFn<Iterable<T>, Void> {
     private final Aggregator<Integer, Integer> success =
-        createAggregator(PAssert.SUCCESS_COUNTER, new Sum.SumIntegerFn());
+        createAggregator(PAssert.SUCCESS_COUNTER, Sum.ofIntegers());
     private final Aggregator<Integer, Integer> failure =
-        createAggregator(PAssert.FAILURE_COUNTER, new Sum.SumIntegerFn());
+        createAggregator(PAssert.FAILURE_COUNTER, Sum.ofIntegers());
     private final T[] expected;
 
     AssertDoFn(T[] expected) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 58414c6..75f6b7d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -154,7 +154,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
    */
   private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>> {
     private final Aggregator<Long, Long> elementCounter =
-        createAggregator("elements", new Sum.SumLongFn());
+        createAggregator("elements", Sum.ofLongs());
     private final Coder<T> elementCoder;
     private final int numShards;
     private final RecordIdMethod recordIdMethod;
@@ -219,11 +219,11 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
     private transient PubsubClient pubsubClient;
 
     private final Aggregator<Long, Long> batchCounter =
-        createAggregator("batches", new Sum.SumLongFn());
+        createAggregator("batches", Sum.ofLongs());
     private final Aggregator<Long, Long> elementCounter =
-        createAggregator("elements", new Sum.SumLongFn());
+        createAggregator("elements", Sum.ofLongs());
     private final Aggregator<Long, Long> byteCounter =
-        createAggregator("bytes", new Sum.SumLongFn());
+        createAggregator("bytes", Sum.ofLongs());
 
     WriterFn(
         PubsubClientFactory pubsubFactory, ValueProvider<TopicPath> topic,

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index da3b437..4b3792d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -59,7 +59,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.Sum.SumLongFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -216,7 +215,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
         }
       };
 
-  private static final Combine.BinaryCombineLongFn SUM = new SumLongFn();
+  private static final Combine.BinaryCombineLongFn SUM = Sum.ofLongs();
 
   // ================================================================================
   // Checkpoint
@@ -1159,7 +1158,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
 
   private static class StatsFn<T> extends DoFn<T, T> {
     private final Aggregator<Long, Long> elementCounter =
-        createAggregator("elements", new Sum.SumLongFn());
+        createAggregator("elements", Sum.ofLongs());
 
     private final PubsubClientFactory pubsubFactory;
     private final ValueProvider<SubscriptionPath> subscription;

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index b23f4f3..b57f4a9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -1019,9 +1019,9 @@ public class PAssert {
   private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> {
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
-        createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+        createAggregator(SUCCESS_COUNTER, Sum.ofIntegers());
     private final Aggregator<Integer, Integer> failure =
-        createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+        createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
     private final PCollectionView<ActualT> actual;
 
     private SideInputCheckerDoFn(
@@ -1054,9 +1054,9 @@ public class PAssert {
   private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> {
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
-        createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+        createAggregator(SUCCESS_COUNTER, Sum.ofIntegers());
     private final Aggregator<Integer, Integer> failure =
-        createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+        createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
 
     private GroupedValuesCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
       this.checkerFn = checkerFn;
@@ -1079,9 +1079,9 @@ public class PAssert {
   private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> {
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
-        createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+        createAggregator(SUCCESS_COUNTER, Sum.ofIntegers());
     private final Aggregator<Integer, Integer> failure =
-        createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+        createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
 
     private SingletonCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
       this.checkerFn = checkerFn;

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index 0990ca4..696bed9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -110,13 +110,69 @@ public class Max {
   }
 
   /**
+   * A {@code CombineFn} that computes the maximum of a collection of {@code Integer}s, useful as an
+   * argument to {@link Combine#globally} or {@link Combine#perKey}.
+   */
+  public static Combine.BinaryCombineIntegerFn ofIntegers() {
+    return new Max.MaxIntegerFn();
+  }
+
+  /**
+   * A {@code CombineFn} that computes the maximum of a collection of {@code Long}s, useful as an
+   * argument to {@link Combine#globally} or {@link Combine#perKey}.
+   */
+  public static Combine.BinaryCombineLongFn ofLongs() {
+    return new Max.MaxLongFn();
+  }
+
+  /**
+   * A {@code CombineFn} that computes the maximum of a collection of {@code Double}s, useful as an
+   * argument to {@link Combine#globally} or {@link Combine#perKey}.
+   */
+  public static Combine.BinaryCombineDoubleFn ofDoubles() {
+    return new Max.MaxDoubleFn();
+  }
+
+  /**
+   * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+   * using an arbitrary {@link Comparator} and {@code identity},
+   * useful as an argument to {@link Combine#globally} or {@link Combine#perKey}.
+   *
+   * @param <T> the type of the values being compared
+   */
+  public static <T, ComparatorT extends Comparator<? super T> & Serializable>
+  BinaryCombineFn<T> of(final T identity, final ComparatorT comparator) {
+    return new MaxFn<T>(identity, comparator);
+  }
+
+  /**
+   * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+   * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
+   * {@link Combine#perKey}.
+   *
+   * @param <T> the type of the values being compared
+   */
+  public static <T, ComparatorT extends Comparator<? super T> & Serializable>
+  BinaryCombineFn<T> of(final ComparatorT comparator) {
+    return new MaxFn<T>(null, comparator);
+  }
+
+  public static <T extends Comparable<? super T>> BinaryCombineFn<T> naturalOrder(T identity) {
+    return new MaxFn<T>(identity, new Top.Largest<T>());
+  }
+
+  public static <T extends Comparable<? super T>> BinaryCombineFn<T> naturalOrder() {
+    return new MaxFn<T>(null, new Top.Largest<T>());
+  }
+
+  /**
    * Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
    * PCollection<T>} whose contents is the maximum according to the natural ordering of {@code T}
    * of the input {@code PCollection}'s elements, or {@code null} if there are no elements.
    */
   public static <T extends Comparable<? super T>>
   Combine.Globally<T, T> globally() {
-    return Combine.<T, T>globally(MaxFn.<T>naturalOrder());
+    return Combine.<T, T>globally(Max.<T>naturalOrder());
   }
 
   /**
@@ -129,7 +185,7 @@ public class Max {
    */
   public static <K, T extends Comparable<? super T>>
   Combine.PerKey<K, T, T> perKey() {
-    return Combine.<K, T, T>perKey(MaxFn.<T>naturalOrder());
+    return Combine.<K, T, T>perKey(Max.<T>naturalOrder());
   }
 
   /**
@@ -139,7 +195,7 @@ public class Max {
    */
   public static <T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.Globally<T, T> globally(ComparatorT comparator) {
-    return Combine.<T, T>globally(MaxFn.of(comparator));
+    return Combine.<T, T>globally(Max.of(comparator));
   }
 
   /**
@@ -151,19 +207,12 @@ public class Max {
    */
   public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
-    return Combine.<K, T, T>perKey(MaxFn.of(comparator));
+    return Combine.<K, T, T>perKey(Max.of(comparator));
   }
 
   /////////////////////////////////////////////////////////////////////////////
 
-  /**
-   * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
-   * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
-   * {@link Combine#perKey}.
-   *
-   * @param <T> the type of the values being compared
-   */
-  public static class MaxFn<T> extends BinaryCombineFn<T> {
+  private static class MaxFn<T> extends BinaryCombineFn<T> {
 
     private final T identity;
     private final Comparator<? super T> comparator;
@@ -174,24 +223,6 @@ public class Max {
       this.comparator = comparator;
     }
 
-    public static <T, ComparatorT extends Comparator<? super T> & Serializable>
-    MaxFn<T> of(T identity, ComparatorT comparator) {
-      return new MaxFn<T>(identity, comparator);
-    }
-
-    public static <T, ComparatorT extends Comparator<? super T> & Serializable>
-    MaxFn<T> of(ComparatorT comparator) {
-      return new MaxFn<T>(null, comparator);
-    }
-
-    public static <T extends Comparable<? super T>> MaxFn<T> naturalOrder(T identity) {
-      return new MaxFn<T>(identity, new Top.Largest<T>());
-    }
-
-    public static <T extends Comparable<? super T>> MaxFn<T> naturalOrder() {
-      return new MaxFn<T>(null, new Top.Largest<T>());
-    }
-
     @Override
     public T identity() {
       return identity;
@@ -210,11 +241,8 @@ public class Max {
     }
   }
 
-  /**
-   * A {@code CombineFn} that computes the maximum of a collection of {@code Integer}s, useful as an
-   * argument to {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn {
+  private static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn {
+
     @Override
     public int apply(int left, int right) {
       return left >= right ? left : right;
@@ -226,11 +254,8 @@ public class Max {
     }
   }
 
-  /**
-   * A {@code CombineFn} that computes the maximum of a collection of {@code Long}s, useful as an
-   * argument to {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class MaxLongFn extends Combine.BinaryCombineLongFn {
+  private static class MaxLongFn extends Combine.BinaryCombineLongFn {
+
     @Override
     public long apply(long left, long right) {
       return left >= right ? left : right;
@@ -242,11 +267,8 @@ public class Max {
     }
   }
 
-  /**
-   * A {@code CombineFn} that computes the maximum of a collection of {@code Double}s, useful as an
-   * argument to {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn {
+  private static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn {
+
     @Override
     public double apply(double left, double right) {
       return left >= right ? left : right;

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index cb77ba3..7e62e9d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -84,9 +84,6 @@ public class Mean {
     return Combine.<K, NumT, Double>perKey(new MeanFn<>());
   }
 
-
-  /////////////////////////////////////////////////////////////////////////////
-
   /**
    * A {@code Combine.CombineFn} that computes the arithmetic mean
    * (a.k.a. average) of an {@code Iterable} of numbers of type
@@ -97,13 +94,19 @@ public class Mean {
    *
    * @param <NumT> the type of the {@code Number}s being combined
    */
-  static class MeanFn<NumT extends Number>
+  public static <NumT extends Number>
+  Combine.AccumulatingCombineFn<NumT, CountSum<NumT>, Double> of() {
+    return new MeanFn<>();
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  private static class MeanFn<NumT extends Number>
   extends Combine.AccumulatingCombineFn<NumT, CountSum<NumT>, Double> {
     /**
      * Constructs a combining function that computes the mean over
      * a collection of values of type {@code N}.
      */
-    public MeanFn() {}
 
     @Override
     public CountSum<NumT> createAccumulator() {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index 5003594..b208929 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -110,13 +110,69 @@ public class Min {
   }
 
   /**
+   * A {@code CombineFn} that computes the minimum of a collection of {@code Integer}s, useful as an
+   * argument to {@link Combine#globally} or {@link Combine#perKey}.
+   */
+  public static Combine.BinaryCombineIntegerFn ofIntegers() {
+    return new Min.MinIntegerFn();
+  }
+
+  /**
+   * A {@code CombineFn} that computes the minimum of a collection of {@code Long}s, useful as an
+   * argument to {@link Combine#globally} or {@link Combine#perKey}.
+   */
+  public static Combine.BinaryCombineLongFn ofLongs() {
+    return new Min.MinLongFn();
+  }
+
+  /**
+   * A {@code CombineFn} that computes the minimum of a collection of {@code Double}s, useful as an
+   * argument to {@link Combine#globally} or {@link Combine#perKey}.
+   */
+  public static Combine.BinaryCombineDoubleFn ofDoubles() {
+    return new Min.MinDoubleFn();
+  }
+
+  /**
+   * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+   * using an arbitrary {@link Comparator} and an {@code identity},
+   * useful as an argument to {@link Combine#globally} or {@link Combine#perKey}.
+   *
+   * @param <T> the type of the values being compared
+   */
+  public static <T, ComparatorT extends Comparator<? super T> & Serializable>
+  BinaryCombineFn<T> of(T identity, ComparatorT comparator) {
+    return new MinFn<T>(identity, comparator);
+  }
+
+  /**
+   * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+   * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
+   * {@link Combine#perKey}.
+   *
+   * @param <T> the type of the values being compared
+   */
+  public static <T, ComparatorT extends Comparator<? super T> & Serializable>
+  BinaryCombineFn<T> of(ComparatorT comparator) {
+    return new MinFn<T>(null, comparator);
+  }
+
+  public static <T extends Comparable<? super T>> BinaryCombineFn<T> naturalOrder(T identity) {
+    return new MinFn<T>(identity, new Top.Largest<T>());
+  }
+
+  public static <T extends Comparable<? super T>> BinaryCombineFn<T> naturalOrder() {
+    return new MinFn<T>(null, new Top.Largest<T>());
+  }
+
+  /**
    * Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
    * PCollection<T>} whose contents is the minimum according to the natural ordering of {@code T}
    * of the input {@code PCollection}'s elements, or {@code null} if there are no elements.
    */
   public static <T extends Comparable<? super T>>
   Combine.Globally<T, T> globally() {
-    return Combine.<T, T>globally(MinFn.<T>naturalOrder());
+    return Combine.<T, T>globally(Min.<T>naturalOrder());
   }
 
   /**
@@ -129,7 +185,7 @@ public class Min {
    */
   public static <K, T extends Comparable<? super T>>
   Combine.PerKey<K, T, T> perKey() {
-    return Combine.<K, T, T>perKey(MinFn.<T>naturalOrder());
+    return Combine.<K, T, T>perKey(Min.<T>naturalOrder());
   }
 
   /**
@@ -139,7 +195,7 @@ public class Min {
    */
   public static <T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.Globally<T, T> globally(ComparatorT comparator) {
-    return Combine.<T, T>globally(MinFn.of(comparator));
+    return Combine.<T, T>globally(Min.of(comparator));
   }
 
   /**
@@ -151,19 +207,12 @@ public class Min {
    */
   public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
-    return Combine.<K, T, T>perKey(MinFn.of(comparator));
+    return Combine.<K, T, T>perKey(Min.of(comparator));
   }
 
   /////////////////////////////////////////////////////////////////////////////
 
-  /**
-   * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
-   * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
-   * {@link Combine#perKey}.
-   *
-   * @param <T> the type of the values being compared
-   */
-  public static class MinFn<T> extends BinaryCombineFn<T> {
+  private static class MinFn<T> extends BinaryCombineFn<T> {
 
     private final T identity;
     private final Comparator<? super T> comparator;
@@ -174,24 +223,6 @@ public class Min {
       this.comparator = comparator;
     }
 
-    public static <T, ComparatorT extends Comparator<? super T> & Serializable>
-    MinFn<T> of(T identity, ComparatorT comparator) {
-      return new MinFn<T>(identity, comparator);
-    }
-
-    public static <T, ComparatorT extends Comparator<? super T> & Serializable>
-    MinFn<T> of(ComparatorT comparator) {
-      return new MinFn<T>(null, comparator);
-    }
-
-    public static <T extends Comparable<? super T>> MinFn<T> naturalOrder(T identity) {
-      return new MinFn<T>(identity, new Top.Largest<T>());
-    }
-
-    public static <T extends Comparable<? super T>> MinFn<T> naturalOrder() {
-      return new MinFn<T>(null, new Top.Largest<T>());
-    }
-
     @Override
     public T identity() {
       return identity;
@@ -210,11 +241,7 @@ public class Min {
     }
   }
 
-  /**
-   * A {@code CombineFn} that computes the minimum of a collection of {@code Integer}s, useful as an
-   * argument to {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class MinIntegerFn extends Combine.BinaryCombineIntegerFn {
+  private static class MinIntegerFn extends Combine.BinaryCombineIntegerFn {
 
     @Override
     public int apply(int left, int right) {
@@ -227,11 +254,8 @@ public class Min {
     }
   }
 
-  /**
-   * A {@code CombineFn} that computes the minimum of a collection of {@code Long}s, useful as an
-   * argument to {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class MinLongFn extends Combine.BinaryCombineLongFn {
+  private static class MinLongFn extends Combine.BinaryCombineLongFn {
+
     @Override
     public long apply(long left, long right) {
       return left <= right ? left : right;
@@ -243,11 +267,7 @@ public class Min {
     }
   }
 
-  /**
-   * A {@code CombineFn} that computes the minimum of a collection of {@code Double}s, useful as an
-   * argument to {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class MinDoubleFn extends Combine.BinaryCombineDoubleFn {
+  private static class MinDoubleFn extends Combine.BinaryCombineDoubleFn {
 
     @Override
     public double apply(double left, double right) {

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
index 48eafc3..ccade4d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
@@ -50,7 +50,7 @@ public class Sum {
    * {@code 0} if there are no elements.
    */
   public static Combine.Globally<Integer, Integer> integersGlobally() {
-    return Combine.globally(new SumIntegerFn());
+    return Combine.globally(Sum.ofIntegers());
   }
 
   /**
@@ -62,7 +62,7 @@ public class Sum {
    * that key in the input {@code PCollection}.
    */
   public static <K> Combine.PerKey<K, Integer, Integer> integersPerKey() {
-    return Combine.<K, Integer, Integer>perKey(new SumIntegerFn());
+    return Combine.<K, Integer, Integer>perKey(Sum.ofIntegers());
   }
 
   /**
@@ -73,7 +73,7 @@ public class Sum {
    * {@code 0} if there are no elements.
    */
   public static Combine.Globally<Long, Long> longsGlobally() {
-    return Combine.globally(new SumLongFn());
+    return Combine.globally(Sum.ofLongs());
   }
 
   /**
@@ -85,7 +85,7 @@ public class Sum {
    * that key in the input {@code PCollection}.
    */
   public static <K> Combine.PerKey<K, Long, Long> longsPerKey() {
-    return Combine.<K, Long, Long>perKey(new SumLongFn());
+    return Combine.<K, Long, Long>perKey(Sum.ofLongs());
   }
 
   /**
@@ -96,7 +96,7 @@ public class Sum {
    * {@code 0} if there are no elements.
    */
   public static Combine.Globally<Double, Double> doublesGlobally() {
-    return Combine.globally(new SumDoubleFn());
+    return Combine.globally(Sum.ofDoubles());
   }
 
   /**
@@ -108,18 +108,40 @@ public class Sum {
    * that key in the input {@code PCollection}.
    */
   public static <K> Combine.PerKey<K, Double, Double> doublesPerKey() {
-    return Combine.<K, Double, Double>perKey(new SumDoubleFn());
+    return Combine.<K, Double, Double>perKey(Sum.ofDoubles());
   }
 
+  /**
+   * A {@code SerializableFunction} that computes the sum of an
+   * {@code Iterable} of {@code Integer}s, useful as an argument to
+   * {@link Combine#globally} or {@link Combine#perKey}.
+   */
+  public static Combine.BinaryCombineIntegerFn ofIntegers() {
+    return new SumIntegerFn();
+  }
 
-  /////////////////////////////////////////////////////////////////////////////
+  /**
+   * A {@code SerializableFunction} that computes the sum of an
+   * {@code Iterable} of {@code Double}s, useful as an argument to
+   * {@link Combine#globally} or {@link Combine#perKey}.
+   */
+  public static Combine.BinaryCombineDoubleFn ofDoubles() {
+    return new SumDoubleFn();
+  }
 
   /**
    * A {@code SerializableFunction} that computes the sum of an
-   * {@code Iterable} of {@code Integer}s, useful as an argument to
+   * {@code Iterable} of {@code Long}s, useful as an argument to
    * {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class SumIntegerFn extends Combine.BinaryCombineIntegerFn {
+  public static Combine.BinaryCombineLongFn ofLongs() {
+    return new SumLongFn();
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  private static class SumIntegerFn extends Combine.BinaryCombineIntegerFn {
+
     @Override
     public int apply(int a, int b) {
       return a + b;
@@ -131,13 +153,8 @@ public class Sum {
     }
   }
 
-  /**
-   * A {@code SerializableFunction} that computes the sum of an
-   * {@code Iterable} of {@code Long}s, useful as an argument to
-   * {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class SumLongFn
-      extends Combine.BinaryCombineLongFn {
+  private static class SumLongFn extends Combine.BinaryCombineLongFn {
+
     @Override
     public long apply(long a, long b) {
       return a + b;
@@ -149,12 +166,8 @@ public class Sum {
     }
   }
 
-  /**
-   * A {@code SerializableFunction} that computes the sum of an
-   * {@code Iterable} of {@code Double}s, useful as an argument to
-   * {@link Combine#globally} or {@link Combine#perKey}.
-   */
-  public static class SumDoubleFn extends Combine.BinaryCombineDoubleFn {
+  private static class SumDoubleFn extends Combine.BinaryCombineDoubleFn {
+
     @Override
     public double apply(double a, double b) {
       return a + b;

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
index 9daecb2..6392fb5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
@@ -51,7 +51,7 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger {
   protected static final StateTag<Object, AccumulatorCombiningState<Instant,
                                               Combine.Holder<Instant>, Instant>> DELAYED_UNTIL_TAG =
       StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
-          "delayed", InstantCoder.of(), Min.MinFn.<Instant>naturalOrder()));
+          "delayed", InstantCoder.of(), Min.<Instant>naturalOrder()));
 
   private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
index 4a706e6..d66e1c4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
@@ -37,7 +37,7 @@ public class AfterPane extends OnceTrigger {
 private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
       ELEMENTS_IN_PANE_TAG =
       StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
-          "count", VarLongCoder.of(), new Sum.SumLongFn()));
+          "count", VarLongCoder.of(), Sum.ofLongs()));
 
   private final int countElems;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
index 1bf2c3d..22efd85 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
@@ -70,8 +70,8 @@ public class AggregatorPipelineExtractorTest {
     AggregatorProvidingDoFn<ThreadGroup, StrictMath> fn = new AggregatorProvidingDoFn<>();
     when(bound.getFn()).thenReturn(fn);
 
-    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
-    Aggregator<Integer, Integer> aggregatorTwo = fn.addAggregator(new Min.MinIntegerFn());
+    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Sum.ofLongs());
+    Aggregator<Integer, Integer> aggregatorTwo = fn.addAggregator(Min.ofIntegers());
 
     TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
     when(transformNode.getTransform()).thenReturn(bound);
@@ -98,8 +98,8 @@ public class AggregatorPipelineExtractorTest {
     AggregatorProvidingDoFn<Object, Void> fn = new AggregatorProvidingDoFn<>();
     when(bound.getFn()).thenReturn(fn);
 
-    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Max.MaxLongFn());
-    Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
+    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Max.ofLongs());
+    Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(Min.ofDoubles());
 
     TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
     when(transformNode.getTransform()).thenReturn(bound);
@@ -129,8 +129,8 @@ public class AggregatorPipelineExtractorTest {
     when(bound.getFn()).thenReturn(fn);
     when(otherBound.getFn()).thenReturn(fn);
 
-    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
-    Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
+    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Sum.ofLongs());
+    Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(Min.ofDoubles());
 
     TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
     when(transformNode.getTransform()).thenReturn(bound);
@@ -160,7 +160,7 @@ public class AggregatorPipelineExtractorTest {
     ParDo.Bound bound = mock(ParDo.Bound.class, "Bound");
 
     AggregatorProvidingDoFn<ThreadGroup, Void> fn = new AggregatorProvidingDoFn<>();
-    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
+    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Sum.ofLongs());
 
     when(bound.getFn()).thenReturn(fn);
 
@@ -168,7 +168,7 @@ public class AggregatorPipelineExtractorTest {
     ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound");
 
     AggregatorProvidingDoFn<Long, Long> otherFn = new AggregatorProvidingDoFn<>();
-    Aggregator<Double, Double> aggregatorTwo = otherFn.addAggregator(new Sum.SumDoubleFn());
+    Aggregator<Double, Double> aggregatorTwo = otherFn.addAggregator(Sum.ofDoubles());
 
     when(otherBound.getFn()).thenReturn(otherFn);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index cdd4707..4d35e53 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -41,8 +41,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
 import org.apache.beam.sdk.transforms.CombineFns.CoCombineResult;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
-import org.apache.beam.sdk.transforms.Min.MinIntegerFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -71,8 +69,8 @@ public class  CombineFnsTest {
 
     TupleTag<Integer> tag = new TupleTag<Integer>();
     CombineFns.compose()
-      .with(new GetIntegerFunction(), new MaxIntegerFn(), tag)
-      .with(new GetIntegerFunction(), new MinIntegerFn(), tag);
+      .with(new GetIntegerFunction(), Max.ofIntegers(), tag)
+      .with(new GetIntegerFunction(), Min.ofIntegers(), tag);
   }
 
   @Test
@@ -82,8 +80,8 @@ public class  CombineFnsTest {
 
     TupleTag<Integer> tag = new TupleTag<Integer>();
     CombineFns.composeKeyed()
-      .with(new GetIntegerFunction(), new MaxIntegerFn(), tag)
-      .with(new GetIntegerFunction(), new MinIntegerFn(), tag);
+      .with(new GetIntegerFunction(), Max.ofIntegers(), tag)
+      .with(new GetIntegerFunction(), Min.ofIntegers(), tag);
   }
 
   @Test
@@ -145,7 +143,7 @@ public class  CombineFnsTest {
         .apply(Combine.globally(CombineFns.compose()
             .with(
                 new GetIntegerFunction(),
-                new MaxIntegerFn(),
+                Max.ofIntegers(),
                 maxIntTag)
             .with(
                 new GetUserStringFunction(),
@@ -159,7 +157,7 @@ public class  CombineFnsTest {
         .apply(Combine.perKey(CombineFns.composeKeyed()
             .with(
                 new GetIntegerFunction(),
-                new MaxIntegerFn().<String>asKeyedFn(),
+                Max.ofIntegers().<String>asKeyedFn(),
                 maxIntTag)
             .with(
                 new GetUserStringFunction(),
@@ -203,7 +201,7 @@ public class  CombineFnsTest {
         .apply(Combine.globally(CombineFns.compose()
             .with(
                 new GetIntegerFunction(),
-                new MaxIntegerFn(),
+                Max.ofIntegers(),
                 maxIntTag)
             .with(
                 new GetUserStringFunction(),
@@ -219,7 +217,7 @@ public class  CombineFnsTest {
         .apply(Combine.perKey(CombineFns.composeKeyed()
             .with(
                 new GetIntegerFunction(),
-                new MaxIntegerFn().<String>asKeyedFn(),
+                Max.ofIntegers().<String>asKeyedFn(),
                 maxIntTag)
             .with(
                 new GetUserStringFunction(),
@@ -262,7 +260,7 @@ public class  CombineFnsTest {
         .apply(Combine.perKey(CombineFns.composeKeyed()
             .with(
                 new GetIntegerFunction(),
-                new MaxIntegerFn().<String>asKeyedFn(),
+                Max.ofIntegers().<String>asKeyedFn(),
                 maxIntTag)
             .with(
                 new GetUserStringFunction(),


[5/7] beam git commit: [BEAM-1223] Relaxed some visibility concerns for Sum*Fn, Min*Fn, Max*Fn and MeanFn until it's supported by Dataflow.

Posted by lc...@apache.org.
[BEAM-1223] Relaxed some visibility concerns for Sum*Fn, Min*Fn, Max*Fn and MeanFn until it's supported by Dataflow.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0697b059
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0697b059
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0697b059

Branch: refs/heads/master
Commit: 0697b05956e2d431c0ec9034e239f67eeff9c040
Parents: 488de7b
Author: Stas Levin <st...@gmail.com>
Authored: Tue Jan 3 20:59:22 2017 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 14:29:05 2017 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/sdk/transforms/Max.java    | 8 ++++----
 .../src/main/java/org/apache/beam/sdk/transforms/Mean.java   | 6 +++---
 .../src/main/java/org/apache/beam/sdk/transforms/Min.java    | 8 ++++----
 .../src/main/java/org/apache/beam/sdk/transforms/Sum.java    | 6 +++---
 4 files changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0697b059/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index 91851bc..69aaaf2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -212,7 +212,7 @@ public class Max {
 
   /////////////////////////////////////////////////////////////////////////////
 
-  private static class MaxFn<T> extends BinaryCombineFn<T> {
+  public static class MaxFn<T> extends BinaryCombineFn<T> {
 
     private final T identity;
     private final Comparator<? super T> comparator;
@@ -241,7 +241,7 @@ public class Max {
     }
   }
 
-  private static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn {
+  public static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn {
 
     @Override
     public int apply(int left, int right) {
@@ -254,7 +254,7 @@ public class Max {
     }
   }
 
-  private static class MaxLongFn extends Combine.BinaryCombineLongFn {
+  public static class MaxLongFn extends Combine.BinaryCombineLongFn {
 
     @Override
     public long apply(long left, long right) {
@@ -267,7 +267,7 @@ public class Max {
     }
   }
 
-  private static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn {
+  public static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn {
 
     @Override
     public double apply(double left, double right) {

http://git-wip-us.apache.org/repos/asf/beam/blob/0697b059/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index 7e62e9d..d650f07 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -64,7 +64,7 @@ public class Mean {
    * @param <NumT> the type of the {@code Number}s being combined
    */
   public static <NumT extends Number> Combine.Globally<NumT, Double> globally() {
-    return Combine.<NumT, Double>globally(new MeanFn<>());
+    return Combine.<NumT, Double>globally(Mean.<NumT>of());
   }
 
   /**
@@ -81,7 +81,7 @@ public class Mean {
    * @param <NumT> the type of the {@code Number}s being combined
    */
   public static <K, NumT extends Number> Combine.PerKey<K, NumT, Double> perKey() {
-    return Combine.<K, NumT, Double>perKey(new MeanFn<>());
+    return Combine.<K, NumT, Double>perKey(Mean.<NumT>of());
   }
 
   /**
@@ -101,7 +101,7 @@ public class Mean {
 
   /////////////////////////////////////////////////////////////////////////////
 
-  private static class MeanFn<NumT extends Number>
+  public static class MeanFn<NumT extends Number>
   extends Combine.AccumulatingCombineFn<NumT, CountSum<NumT>, Double> {
     /**
      * Constructs a combining function that computes the mean over

http://git-wip-us.apache.org/repos/asf/beam/blob/0697b059/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index 109f4e5..ce3b64a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -212,7 +212,7 @@ public class Min {
 
   /////////////////////////////////////////////////////////////////////////////
 
-  private static class MinFn<T> extends BinaryCombineFn<T> {
+  public static class MinFn<T> extends BinaryCombineFn<T> {
 
     private final T identity;
     private final Comparator<? super T> comparator;
@@ -241,7 +241,7 @@ public class Min {
     }
   }
 
-  private static class MinIntegerFn extends Combine.BinaryCombineIntegerFn {
+  public static class MinIntegerFn extends Combine.BinaryCombineIntegerFn {
 
     @Override
     public int apply(int left, int right) {
@@ -254,7 +254,7 @@ public class Min {
     }
   }
 
-  private static class MinLongFn extends Combine.BinaryCombineLongFn {
+  public static class MinLongFn extends Combine.BinaryCombineLongFn {
 
     @Override
     public long apply(long left, long right) {
@@ -267,7 +267,7 @@ public class Min {
     }
   }
 
-  private static class MinDoubleFn extends Combine.BinaryCombineDoubleFn {
+  public static class MinDoubleFn extends Combine.BinaryCombineDoubleFn {
 
     @Override
     public double apply(double left, double right) {

http://git-wip-us.apache.org/repos/asf/beam/blob/0697b059/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
index ccade4d..4e0f680 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
@@ -140,7 +140,7 @@ public class Sum {
 
   /////////////////////////////////////////////////////////////////////////////
 
-  private static class SumIntegerFn extends Combine.BinaryCombineIntegerFn {
+  public static class SumIntegerFn extends Combine.BinaryCombineIntegerFn {
 
     @Override
     public int apply(int a, int b) {
@@ -153,7 +153,7 @@ public class Sum {
     }
   }
 
-  private static class SumLongFn extends Combine.BinaryCombineLongFn {
+  public static class SumLongFn extends Combine.BinaryCombineLongFn {
 
     @Override
     public long apply(long a, long b) {
@@ -166,7 +166,7 @@ public class Sum {
     }
   }
 
-  private static class SumDoubleFn extends Combine.BinaryCombineDoubleFn {
+  public static class SumDoubleFn extends Combine.BinaryCombineDoubleFn {
 
     @Override
     public double apply(double a, double b) {


[4/7] beam git commit: [BEAM-1223] Fixed some javadoc.

Posted by lc...@apache.org.
[BEAM-1223] Fixed some javadoc.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/488de7b1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/488de7b1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/488de7b1

Branch: refs/heads/master
Commit: 488de7b1c6c973ea7f711839a436f9edc2094488
Parents: 6b68699
Author: Stas Levin <st...@gmail.com>
Authored: Tue Jan 3 20:48:42 2017 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 14:28:21 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/spark/aggregators/NamedAggregators.java  | 4 ++--
 .../core/src/main/java/org/apache/beam/sdk/transforms/Min.java   | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/488de7b1/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index 52fe994..b5aec32 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -153,9 +153,9 @@ public class NamedAggregators implements Serializable {
   }
 
   /**
-   * @param <InputT>    Input data type
+   * @param <InputT> Input data type
    * @param <InterT> Intermediate data type (useful for averages)
-   * @param <OutputT>   Output data type
+   * @param <OutputT> Output data type
    */
   public static class CombineFunctionState<InputT, InterT, OutputT>
       implements State<InputT, InterT, OutputT> {

http://git-wip-us.apache.org/repos/asf/beam/blob/488de7b1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index 3518a35..109f4e5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -134,7 +134,7 @@ public class Min {
   }
 
   /**
-   * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+   * A {@code CombineFn} that computes the minimum of a collection of elements of type {@code T}
    * using an arbitrary {@link Comparator} and an {@code identity},
    * useful as an argument to {@link Combine#globally} or {@link Combine#perKey}.
    *
@@ -146,7 +146,7 @@ public class Min {
   }
 
   /**
-   * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+   * A {@code CombineFn} that computes the minimum of a collection of elements of type {@code T}
    * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
    * {@link Combine#perKey}.
    *


[6/7] beam git commit: [BEAM-1223] Added the missing javadocs.

Posted by lc...@apache.org.
[BEAM-1223] Added the missing javadocs.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3e16e325
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3e16e325
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3e16e325

Branch: refs/heads/master
Commit: 3e16e325a6d88ac7b21bb9f79357b14dd280becd
Parents: 0697b05
Author: Stas Levin <st...@gmail.com>
Authored: Tue Jan 3 21:36:12 2017 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 14:29:06 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/transforms/Max.java | 19 +++++++++++++++++++
 .../org/apache/beam/sdk/transforms/Mean.java     | 10 ++++++++++
 .../java/org/apache/beam/sdk/transforms/Min.java | 19 +++++++++++++++++++
 .../java/org/apache/beam/sdk/transforms/Sum.java | 15 +++++++++++++++
 4 files changed, 63 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3e16e325/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index 69aaaf2..f6460b6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -212,6 +212,13 @@ public class Max {
 
   /////////////////////////////////////////////////////////////////////////////
 
+  /**
+   * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+   * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
+   * {@link Combine#perKey}.
+   *
+   * @param <T> the type of the values being compared
+   */
   public static class MaxFn<T> extends BinaryCombineFn<T> {
 
     private final T identity;
@@ -241,6 +248,10 @@ public class Max {
     }
   }
 
+  /**
+   * A {@code CombineFn} that computes the maximum of a collection of {@code Integer}s, useful as an
+   * argument to {@link Combine#globally} or {@link Combine#perKey}.
+   */
   public static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn {
 
     @Override
@@ -254,6 +265,10 @@ public class Max {
     }
   }
 
+  /**
+   * A {@code CombineFn} that computes the maximum of a collection of {@code Long}s, useful as an
+   * argument to {@link Combine#globally} or {@link Combine#perKey}.
+   */
   public static class MaxLongFn extends Combine.BinaryCombineLongFn {
 
     @Override
@@ -267,6 +282,10 @@ public class Max {
     }
   }
 
+  /**
+   * A {@code CombineFn} that computes the maximum of a collection of {@code Double}s, useful as an
+   * argument to {@link Combine#globally} or {@link Combine#perKey}.
+   */
   public static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn {
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/3e16e325/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index d650f07..17dbe6c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -101,6 +101,16 @@ public class Mean {
 
   /////////////////////////////////////////////////////////////////////////////
 
+  /**
+   * A {@code Combine.CombineFn} that computes the arithmetic mean
+   * (a.k.a. average) of an {@code Iterable} of numbers of type
+   * {@code N}, useful as an argument to {@link Combine#globally} or
+   * {@link Combine#perKey}.
+   *
+   * <p>Returns {@code Double.NaN} if combining zero elements.
+   *
+   * @param <NumT> the type of the {@code Number}s being combined
+   */
   public static class MeanFn<NumT extends Number>
   extends Combine.AccumulatingCombineFn<NumT, CountSum<NumT>, Double> {
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/3e16e325/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index ce3b64a..47d831c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -212,6 +212,13 @@ public class Min {
 
   /////////////////////////////////////////////////////////////////////////////
 
+  /**
+   * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+   * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
+   * {@link Combine#perKey}.
+   *
+   * @param <T> the type of the values being compared
+   */
   public static class MinFn<T> extends BinaryCombineFn<T> {
 
     private final T identity;
@@ -241,6 +248,10 @@ public class Min {
     }
   }
 
+  /**
+   * A {@code CombineFn} that computes the minimum of a collection of {@code Integer}s, useful as an
+   * argument to {@link Combine#globally} or {@link Combine#perKey}.
+   */
   public static class MinIntegerFn extends Combine.BinaryCombineIntegerFn {
 
     @Override
@@ -254,6 +265,10 @@ public class Min {
     }
   }
 
+  /**
+   * A {@code CombineFn} that computes the minimum of a collection of {@code Long}s, useful as an
+   * argument to {@link Combine#globally} or {@link Combine#perKey}.
+   */
   public static class MinLongFn extends Combine.BinaryCombineLongFn {
 
     @Override
@@ -267,6 +282,10 @@ public class Min {
     }
   }
 
+  /**
+   * A {@code CombineFn} that computes the minimum of a collection of {@code Double}s, useful as an
+   * argument to {@link Combine#globally} or {@link Combine#perKey}.
+   */
   public static class MinDoubleFn extends Combine.BinaryCombineDoubleFn {
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/3e16e325/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
index 4e0f680..5044732 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
@@ -140,6 +140,11 @@ public class Sum {
 
   /////////////////////////////////////////////////////////////////////////////
 
+  /**
+   * A {@code SerializableFunction} that computes the sum of an
+   * {@code Iterable} of {@code Integer}s, useful as an argument to
+   * {@link Combine#globally} or {@link Combine#perKey}.
+   */
   public static class SumIntegerFn extends Combine.BinaryCombineIntegerFn {
 
     @Override
@@ -153,6 +158,11 @@ public class Sum {
     }
   }
 
+  /**
+   * A {@code SerializableFunction} that computes the sum of an
+   * {@code Iterable} of {@code Long}s, useful as an argument to
+   * {@link Combine#globally} or {@link Combine#perKey}.
+   */
   public static class SumLongFn extends Combine.BinaryCombineLongFn {
 
     @Override
@@ -166,6 +176,11 @@ public class Sum {
     }
   }
 
+  /**
+   * A {@code SerializableFunction} that computes the sum of an
+   * {@code Iterable} of {@code Double}s, useful as an argument to
+   * {@link Combine#globally} or {@link Combine#perKey}.
+   */
   public static class SumDoubleFn extends Combine.BinaryCombineDoubleFn {
 
     @Override


[7/7] beam git commit: [BEAM-1223] Introduced Sum, Min, Max#ofLongs()/ofDoubles()/ofIntegers()

Posted by lc...@apache.org.
[BEAM-1223] Introduced Sum,Min,Max#ofLongs()/ofDoubles()/ofIntegers()

This closes #1723


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d86db15b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d86db15b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d86db15b

Branch: refs/heads/master
Commit: d86db15ba22cbd99093327dc4962e06fa2d5db43
Parents: e794f14 3e16e32
Author: Luke Cwik <lc...@google.com>
Authored: Tue Jan 3 14:36:24 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 14:36:24 2017 -0800

----------------------------------------------------------------------
 .../beam/examples/DebuggingWordCount.java       |  4 +-
 .../org/apache/beam/examples/WordCount.java     |  2 +-
 .../cookbook/CombinePerKeyExamples.java         |  2 +-
 .../beam/examples/complete/game/GameStats.java  |  2 +-
 .../beam/examples/complete/game/UserScore.java  |  2 +-
 .../runners/apex/examples/WordCountTest.java    |  2 +-
 .../utils/ApexStateInternalsTest.java           |  2 +-
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |  4 +-
 .../runners/core/GroupAlsoByWindowsDoFn.java    |  4 +-
 .../apache/beam/runners/core/NonEmptyPanes.java |  2 +-
 .../AfterDelayFromFirstElementStateMachine.java |  2 +-
 .../core/triggers/AfterPaneStateMachine.java    |  2 +-
 .../core/LateDataDroppingDoFnRunnerTest.java    |  2 +-
 .../beam/runners/core/ReduceFnRunnerTest.java   | 12 +--
 .../beam/runners/core/ReduceFnTester.java       |  2 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |  4 +-
 .../runners/direct/AggregatorContainerTest.java | 16 ++--
 .../CopyOnAccessInMemoryStateInternalsTest.java |  4 +-
 .../runners/direct/EvaluationContextTest.java   |  6 +-
 .../beam/runners/flink/examples/WordCount.java  |  2 +-
 .../flink/examples/streaming/AutoComplete.java  |  2 +-
 .../KafkaWindowedWordCountExample.java          |  2 +-
 .../examples/streaming/WindowedWordCount.java   |  2 +-
 .../streaming/FlinkStateInternalsTest.java      |  2 +-
 .../dataflow/DataflowPipelineJobTest.java       |  8 +-
 .../spark/aggregators/NamedAggregators.java     |  4 +-
 .../beam/runners/spark/examples/WordCount.java  |  2 +-
 .../translation/SparkGroupAlsoByWindowFn.java   |  2 +-
 .../spark/translation/SparkRuntimeContext.java  | 63 +++++----------
 .../ResumeFromCheckpointStreamingTest.java      |  2 +-
 .../streaming/utils/PAssertStreaming.java       |  4 +-
 .../apache/beam/sdk/io/PubsubUnboundedSink.java |  8 +-
 .../beam/sdk/io/PubsubUnboundedSource.java      |  5 +-
 .../org/apache/beam/sdk/testing/PAssert.java    | 12 +--
 .../org/apache/beam/sdk/transforms/Max.java     | 85 +++++++++++++++-----
 .../org/apache/beam/sdk/transforms/Mean.java    | 21 ++++-
 .../org/apache/beam/sdk/transforms/Min.java     | 83 ++++++++++++++-----
 .../org/apache/beam/sdk/transforms/Sum.java     | 44 ++++++++--
 .../windowing/AfterDelayFromFirstElement.java   |  2 +-
 .../sdk/transforms/windowing/AfterPane.java     |  2 +-
 .../sdk/AggregatorPipelineExtractorTest.java    | 16 ++--
 .../beam/sdk/transforms/CombineFnsTest.java     | 20 +++--
 .../apache/beam/sdk/transforms/DoFnTest.java    | 15 ++--
 .../beam/sdk/transforms/DoFnTesterTest.java     |  6 +-
 .../org/apache/beam/sdk/transforms/MaxTest.java |  6 +-
 .../apache/beam/sdk/transforms/MeanTest.java    |  2 +-
 .../org/apache/beam/sdk/transforms/MinTest.java |  6 +-
 .../beam/sdk/transforms/OldDoFnContextTest.java |  2 +-
 .../apache/beam/sdk/transforms/OldDoFnTest.java | 11 ++-
 .../beam/sdk/transforms/SimpleStatsFnsTest.java | 36 ++++-----
 .../org/apache/beam/sdk/transforms/SumTest.java | 12 +--
 .../apache/beam/sdk/transforms/ViewTest.java    |  2 +-
 .../apache/beam/sdk/util/CombineFnUtilTest.java |  8 +-
 .../util/state/InMemoryStateInternalsTest.java  |  2 +-
 .../beam/sdk/util/state/StateTagTest.java       | 11 ++-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  2 +-
 56 files changed, 341 insertions(+), 249 deletions(-)
----------------------------------------------------------------------