You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/01/03 22:36:36 UTC
[1/7] beam git commit: [BEAM-1223] Reduced [Sum|Min|Max|Mean]*Fn
viability so as to make them an internal implementation detail and prevent
external code from employing them for tasks such as pipeline translation.
Repository: beam
Updated Branches:
refs/heads/master e794f1486 -> d86db15ba
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
index 19b7c51..4e80154 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.junit.Rule;
import org.junit.Test;
@@ -59,7 +58,7 @@ public class DoFnTest implements Serializable {
@Test
public void testCreateAggregatorWithCombinerSucceeds() {
String name = "testAggregator";
- Sum.SumLongFn combiner = new Sum.SumLongFn();
+ Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
DoFn<Void, Void> doFn = new NoOpDoFn();
@@ -76,7 +75,7 @@ public class DoFnTest implements Serializable {
DoFn<Void, Void> doFn = new NoOpDoFn();
- doFn.createAggregator(null, new Sum.SumLongFn());
+ doFn.createAggregator(null, Sum.ofLongs());
}
@Test
@@ -106,7 +105,7 @@ public class DoFnTest implements Serializable {
@Test
public void testCreateAggregatorWithSameNameThrowsException() {
String name = "testAggregator";
- CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
+ CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
DoFn<Void, Void> doFn = new NoOpDoFn();
@@ -124,7 +123,7 @@ public class DoFnTest implements Serializable {
public void testCreateAggregatorsWithDifferentNamesSucceeds() {
String nameOne = "testAggregator";
String nameTwo = "aggregatorPrime";
- CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
+ CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
DoFn<Void, Void> doFn = new NoOpDoFn();
@@ -151,7 +150,7 @@ public class DoFnTest implements Serializable {
TestPipeline p = createTestPipeline(new DoFn<String, String>() {
@StartBundle
public void startBundle(Context c) {
- createAggregator("anyAggregate", new MaxIntegerFn());
+ createAggregator("anyAggregate", Max.ofIntegers());
}
@ProcessElement
@@ -170,7 +169,7 @@ public class DoFnTest implements Serializable {
TestPipeline p = createTestPipeline(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
- createAggregator("anyAggregate", new MaxIntegerFn());
+ createAggregator("anyAggregate", Max.ofIntegers());
}
});
@@ -186,7 +185,7 @@ public class DoFnTest implements Serializable {
TestPipeline p = createTestPipeline(new DoFn<String, String>() {
@FinishBundle
public void finishBundle(Context c) {
- createAggregator("anyAggregate", new MaxIntegerFn());
+ createAggregator("anyAggregate", Max.ofIntegers());
}
@ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 3859c9f..699687f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -404,11 +404,11 @@ public class DoFnTesterTest {
* {@link DoFn.ProcessElement @ProcessElement}.
*/
private static class CounterDoFn extends DoFn<Long, String> {
- Aggregator<Long, Long> agg = createAggregator("ctr", new Sum.SumLongFn());
+ Aggregator<Long, Long> agg = createAggregator("ctr", Sum.ofLongs());
Aggregator<Long, Long> startBundleCalls =
- createAggregator("startBundleCalls", new Sum.SumLongFn());
+ createAggregator("startBundleCalls", Sum.ofLongs());
Aggregator<Long, Long> finishBundleCalls =
- createAggregator("finishBundleCalls", new Sum.SumLongFn());
+ createAggregator("finishBundleCalls", Sum.ofLongs());
private enum LifecycleState {
UNINITIALIZED,
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
index 4aa39a3..2b43560 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
@@ -46,7 +46,7 @@ public class MaxTest {
@Test
public void testMaxIntegerFn() {
checkCombineFn(
- new Max.MaxIntegerFn(),
+ Max.ofIntegers(),
Lists.newArrayList(1, 2, 3, 4),
4);
}
@@ -54,7 +54,7 @@ public class MaxTest {
@Test
public void testMaxLongFn() {
checkCombineFn(
- new Max.MaxLongFn(),
+ Max.ofLongs(),
Lists.newArrayList(1L, 2L, 3L, 4L),
4L);
}
@@ -62,7 +62,7 @@ public class MaxTest {
@Test
public void testMaxDoubleFn() {
checkCombineFn(
- new Max.MaxDoubleFn(),
+ Max.ofDoubles(),
Lists.newArrayList(1.0, 2.0, 3.0, 4.0),
4.0);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
index 84741ee..79ebc25 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
@@ -65,7 +65,7 @@ public class MeanTest {
@Test
public void testMeanFn() throws Exception {
checkCombineFn(
- new Mean.MeanFn<Integer>(),
+ Mean.<Integer>of(),
Lists.newArrayList(1, 2, 3, 4),
2.5);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
index 4334ed9..e89b223 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
@@ -46,7 +46,7 @@ public class MinTest {
@Test
public void testMinIntegerFn() {
checkCombineFn(
- new Min.MinIntegerFn(),
+ Min.ofIntegers(),
Lists.newArrayList(1, 2, 3, 4),
1);
}
@@ -54,7 +54,7 @@ public class MinTest {
@Test
public void testMinLongFn() {
checkCombineFn(
- new Min.MinLongFn(),
+ Min.ofLongs(),
Lists.newArrayList(1L, 2L, 3L, 4L),
1L);
}
@@ -62,7 +62,7 @@ public class MinTest {
@Test
public void testMinDoubleFn() {
checkCombineFn(
- new Min.MinDoubleFn(),
+ Min.ofDoubles(),
Lists.newArrayList(1.0, 2.0, 3.0, 4.0),
1.0);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
index b2d4aed..b5cb286 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
@@ -55,7 +55,7 @@ public class OldDoFnContextTest {
@Test
public void testSetupDelegateAggregatorsCreatesAndLinksDelegateAggregators() {
- Sum.SumLongFn combiner = new Sum.SumLongFn();
+ Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
Aggregator<Long, Long> delegateAggregator =
fn.createAggregator("test", combiner);
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
index cc84252..1c767b1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertThat;
import java.io.Serializable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;
@@ -48,7 +47,7 @@ public class OldDoFnTest implements Serializable {
@Test
public void testCreateAggregatorWithCombinerSucceeds() {
String name = "testAggregator";
- Sum.SumLongFn combiner = new Sum.SumLongFn();
+ Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
@@ -65,7 +64,7 @@ public class OldDoFnTest implements Serializable {
OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
- doFn.createAggregator(null, new Sum.SumLongFn());
+ doFn.createAggregator(null, Sum.ofLongs());
}
@Test
@@ -95,7 +94,7 @@ public class OldDoFnTest implements Serializable {
@Test
public void testCreateAggregatorWithSameNameThrowsException() {
String name = "testAggregator";
- CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
+ CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
@@ -113,7 +112,7 @@ public class OldDoFnTest implements Serializable {
public void testCreateAggregatorsWithDifferentNamesSucceeds() {
String nameOne = "testAggregator";
String nameTwo = "aggregatorPrime";
- CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
+ CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
@@ -135,7 +134,7 @@ public class OldDoFnTest implements Serializable {
context.setupDelegateAggregators();
thrown.expect(isA(IllegalStateException.class));
- fn.createAggregator("anyAggregate", new MaxIntegerFn());
+ fn.createAggregator("anyAggregate", Max.ofIntegers());
}
private OldDoFn<String, String>.Context createContext(OldDoFn<String, String> fn) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleStatsFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleStatsFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleStatsFnsTest.java
index a782ecc..2fd86c6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleStatsFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleStatsFnsTest.java
@@ -78,31 +78,31 @@ public class SimpleStatsFnsTest {
@Test
public void testInstantStats() {
- assertEquals(new Instant(1000), Min.MinFn.<Instant>naturalOrder().apply(
+ assertEquals(new Instant(1000), Min.<Instant>naturalOrder().apply(
Arrays.asList(new Instant(1000), new Instant(2000))));
- assertEquals(null, Min.MinFn.<Instant>naturalOrder().apply(
+ assertEquals(null, Min.<Instant>naturalOrder().apply(
Collections.<Instant>emptyList()));
- assertEquals(new Instant(5000), Min.MinFn.<Instant>naturalOrder(new Instant(5000)).apply(
+ assertEquals(new Instant(5000), Min.<Instant>naturalOrder(new Instant(5000)).apply(
Collections.<Instant>emptyList()));
- assertEquals(new Instant(2000), Max.MaxFn.<Instant>naturalOrder().apply(
+ assertEquals(new Instant(2000), Max.<Instant>naturalOrder().apply(
Arrays.asList(new Instant(1000), new Instant(2000))));
- assertEquals(null, Max.MaxFn.<Instant>naturalOrder().apply(
+ assertEquals(null, Max.<Instant>naturalOrder().apply(
Collections.<Instant>emptyList()));
- assertEquals(new Instant(5000), Max.MaxFn.<Instant>naturalOrder(new Instant(5000)).apply(
+ assertEquals(new Instant(5000), Max.<Instant>naturalOrder(new Instant(5000)).apply(
Collections.<Instant>emptyList()));
}
@Test
public void testDoubleStats() {
for (TestCase<Double> t : DOUBLE_CASES) {
- assertEquals(t.sum, new Sum.SumDoubleFn().apply(t.data),
+ assertEquals(t.sum, Sum.ofDoubles().apply(t.data),
DOUBLE_COMPARISON_ACCURACY);
- assertEquals(t.min, new Min.MinDoubleFn().apply(t.data),
+ assertEquals(t.min, Min.ofDoubles().apply(t.data),
DOUBLE_COMPARISON_ACCURACY);
- assertEquals(t.max, new Max.MaxDoubleFn().apply(t.data),
+ assertEquals(t.max, Max.ofDoubles().apply(t.data),
DOUBLE_COMPARISON_ACCURACY);
- assertEquals(t.mean, new Mean.MeanFn<Double>().apply(t.data),
+ assertEquals(t.mean, Mean.<Double>of().apply(t.data),
DOUBLE_COMPARISON_ACCURACY);
}
}
@@ -110,20 +110,20 @@ public class SimpleStatsFnsTest {
@Test
public void testIntegerStats() {
for (TestCase<Integer> t : INTEGER_CASES) {
- assertEquals(t.sum, new Sum.SumIntegerFn().apply(t.data));
- assertEquals(t.min, new Min.MinIntegerFn().apply(t.data));
- assertEquals(t.max, new Max.MaxIntegerFn().apply(t.data));
- assertEquals(t.mean, new Mean.MeanFn<Integer>().apply(t.data));
+ assertEquals(t.sum, Sum.ofIntegers().apply(t.data));
+ assertEquals(t.min, Min.ofIntegers().apply(t.data));
+ assertEquals(t.max, Max.ofIntegers().apply(t.data));
+ assertEquals(t.mean, Mean.<Integer>of().apply(t.data));
}
}
@Test
public void testLongStats() {
for (TestCase<Long> t : LONG_CASES) {
- assertEquals(t.sum, new Sum.SumLongFn().apply(t.data));
- assertEquals(t.min, new Min.MinLongFn().apply(t.data));
- assertEquals(t.max, new Max.MaxLongFn().apply(t.data));
- assertEquals(t.mean, new Mean.MeanFn<Long>().apply(t.data));
+ assertEquals(t.sum, Sum.ofLongs().apply(t.data));
+ assertEquals(t.min, Min.ofLongs().apply(t.data));
+ assertEquals(t.max, Max.ofLongs().apply(t.data));
+ assertEquals(t.mean, Mean.<Long>of().apply(t.data));
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
index 04c0186..b2f8aa8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
@@ -52,7 +52,7 @@ public class SumTest {
@Test
public void testSumIntegerFn() {
checkCombineFn(
- new Sum.SumIntegerFn(),
+ Sum.ofIntegers(),
Lists.newArrayList(1, 2, 3, 4),
10);
}
@@ -60,7 +60,7 @@ public class SumTest {
@Test
public void testSumLongFn() {
checkCombineFn(
- new Sum.SumLongFn(),
+ Sum.ofLongs(),
Lists.newArrayList(1L, 2L, 3L, 4L),
10L);
}
@@ -68,14 +68,14 @@ public class SumTest {
@Test
public void testSumDoubleFn() {
checkCombineFn(
- new Sum.SumDoubleFn(),
+ Sum.ofDoubles(),
Lists.newArrayList(1.0, 2.0, 3.0, 4.0),
10.0);
}
@Test
public void testGetAccumulatorCoderEquals() {
- Sum.SumIntegerFn sumIntegerFn = new Sum.SumIntegerFn();
+ Combine.BinaryCombineIntegerFn sumIntegerFn = Sum.ofIntegers();
assertEquals(
sumIntegerFn.getAccumulatorCoder(STANDARD_REGISTRY, VarIntCoder.of()),
sumIntegerFn.getAccumulatorCoder(STANDARD_REGISTRY, VarIntCoder.of()));
@@ -83,7 +83,7 @@ public class SumTest {
sumIntegerFn.getAccumulatorCoder(STANDARD_REGISTRY, VarIntCoder.of()),
sumIntegerFn.getAccumulatorCoder(STANDARD_REGISTRY, BigEndianIntegerCoder.of()));
- Sum.SumLongFn sumLongFn = new Sum.SumLongFn();
+ Combine.BinaryCombineLongFn sumLongFn = Sum.ofLongs();
assertEquals(
sumLongFn.getAccumulatorCoder(STANDARD_REGISTRY, VarLongCoder.of()),
sumLongFn.getAccumulatorCoder(STANDARD_REGISTRY, VarLongCoder.of()));
@@ -91,7 +91,7 @@ public class SumTest {
sumLongFn.getAccumulatorCoder(STANDARD_REGISTRY, VarLongCoder.of()),
sumLongFn.getAccumulatorCoder(STANDARD_REGISTRY, BigEndianLongCoder.of()));
- Sum.SumDoubleFn sumDoubleFn = new Sum.SumDoubleFn();
+ Combine.BinaryCombineDoubleFn sumDoubleFn = Sum.ofDoubles();
assertEquals(
sumDoubleFn.getAccumulatorCoder(STANDARD_REGISTRY, DoubleCoder.of()),
sumDoubleFn.getAccumulatorCoder(STANDARD_REGISTRY, DoubleCoder.of()));
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index 1d8b32c..0249ac9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -1120,7 +1120,7 @@ public class ViewTest implements Serializable {
final PCollectionView<Map<String, Integer>> view =
pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("a", 20), KV.of("b", 3)))
- .apply("SumIntegers", Combine.perKey(new Sum.SumIntegerFn().<String>asKeyedFn()))
+ .apply("SumIntegers", Combine.perKey(Sum.ofIntegers().<String>asKeyedFn()))
.apply(View.<String, Integer>asMap());
PCollection<KV<String, Integer>> output =
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
index fe81275..36a90e9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
@@ -70,18 +70,18 @@ public class CombineFnUtilTest {
@Test
public void testToFnWithContextIdempotent() throws Exception {
CombineFnWithContext<Integer, int[], Integer> fnWithContext =
- CombineFnUtil.toFnWithContext(new Sum.SumIntegerFn());
+ CombineFnUtil.toFnWithContext(Sum.ofIntegers());
assertTrue(fnWithContext == CombineFnUtil.toFnWithContext(fnWithContext));
KeyedCombineFnWithContext<Object, Integer, int[], Integer> keyedFnWithContext =
- CombineFnUtil.toFnWithContext(new Sum.SumIntegerFn().asKeyedFn());
+ CombineFnUtil.toFnWithContext(Sum.ofIntegers().asKeyedFn());
assertTrue(keyedFnWithContext == CombineFnUtil.toFnWithContext(keyedFnWithContext));
}
@Test
public void testToFnWithContext() throws Exception {
CombineFnWithContext<Integer, int[], Integer> fnWithContext =
- CombineFnUtil.toFnWithContext(new Sum.SumIntegerFn());
+ CombineFnUtil.toFnWithContext(Sum.ofIntegers());
List<Integer> inputs = ImmutableList.of(1, 2, 3, 4);
Context nullContext = CombineContextFactory.nullContext();
int[] accum = fnWithContext.createAccumulator(nullContext);
@@ -91,7 +91,7 @@ public class CombineFnUtilTest {
assertEquals(10, fnWithContext.extractOutput(accum, nullContext).intValue());
KeyedCombineFnWithContext<String, Integer, int[], Integer> keyedFnWithContext =
- CombineFnUtil.toFnWithContext(new Sum.SumIntegerFn().<String>asKeyedFn());
+ CombineFnUtil.toFnWithContext(Sum.ofIntegers().<String>asKeyedFn());
String key = "key";
accum = keyedFnWithContext.createAccumulator(key, nullContext);
for (Integer i : inputs) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryStateInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryStateInternalsTest.java
index 08a6bc1..e43ad36 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryStateInternalsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryStateInternalsTest.java
@@ -48,7 +48,7 @@ public class InMemoryStateInternalsTest {
StateTags.value("stringValue", StringUtf8Coder.of());
private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
- "sumInteger", VarIntCoder.of(), new Sum.SumIntegerFn());
+ "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java
index 2c8c9cc..7c06dbd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java
@@ -24,10 +24,9 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.Min.MinIntegerFn;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.junit.Test;
@@ -86,10 +85,10 @@ public class StateTagTest {
@SuppressWarnings({"unchecked", "rawtypes"})
@Test
public void testCombiningValueEquality() {
- MaxIntegerFn maxFn = new Max.MaxIntegerFn();
+ Combine.BinaryCombineIntegerFn maxFn = Max.ofIntegers();
Coder<Integer> input1 = VarIntCoder.of();
Coder<Integer> input2 = BigEndianIntegerCoder.of();
- MinIntegerFn minFn = new Min.MinIntegerFn();
+ Combine.BinaryCombineIntegerFn minFn = Min.ofIntegers();
StateTag<?, ?> fooCoder1Max1 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
StateTag<?, ?> fooCoder1Max2 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
@@ -129,8 +128,8 @@ public class StateTagTest {
CoderRegistry registry = new CoderRegistry();
registry.registerStandardCoders();
- MaxIntegerFn maxFn = new Max.MaxIntegerFn();
- MinIntegerFn minFn = new Min.MinIntegerFn();
+ Combine.BinaryCombineIntegerFn maxFn = Max.ofIntegers();
+ Combine.BinaryCombineIntegerFn minFn = Min.ofIntegers();
Coder<int[]> accum1 = maxFn.getAccumulatorCoder(registry, VarIntCoder.of());
Coder<int[]> accum2 = minFn.getAccumulatorCoder(registry, BigEndianIntegerCoder.of());
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 3a36add..7efa115 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -2577,7 +2577,7 @@ public class BigQueryIO {
/** Tracks bytes written, exposed as "ByteCount" Counter. */
private Aggregator<Long, Long> byteCountAggregator =
- createAggregator("ByteCount", new Sum.SumLongFn());
+ createAggregator("ByteCount", Sum.ofLongs());
/** Constructor. */
StreamingWriteFn(ValueProvider<TableSchema> schema, BigQueryServices bqServices) {
[3/7] beam git commit: [BEAM-1223] Resolved some type-safety related
warnings.
Posted by lc...@apache.org.
[BEAM-1223] Resolved some type-safety related warnings.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6b68699c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6b68699c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6b68699c
Branch: refs/heads/master
Commit: 6b68699cd75e8b17bc84cf58053fe1f7583e447f
Parents: 78a360e
Author: Stas Levin <st...@gmail.com>
Authored: Mon Jan 2 18:29:21 2017 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 14:28:19 2017 -0800
----------------------------------------------------------------------
.../core/src/main/java/org/apache/beam/sdk/transforms/Max.java | 4 ++--
.../core/src/main/java/org/apache/beam/sdk/transforms/Min.java | 4 ++--
2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6b68699c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index 696bed9..91851bc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -195,7 +195,7 @@ public class Max {
*/
public static <T, ComparatorT extends Comparator<? super T> & Serializable>
Combine.Globally<T, T> globally(ComparatorT comparator) {
- return Combine.<T, T>globally(Max.of(comparator));
+ return Combine.<T, T>globally(Max.<T, ComparatorT>of(comparator));
}
/**
@@ -207,7 +207,7 @@ public class Max {
*/
public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
- return Combine.<K, T, T>perKey(Max.of(comparator));
+ return Combine.<K, T, T>perKey(Max.<T, ComparatorT>of(comparator));
}
/////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/beam/blob/6b68699c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index b208929..3518a35 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -195,7 +195,7 @@ public class Min {
*/
public static <T, ComparatorT extends Comparator<? super T> & Serializable>
Combine.Globally<T, T> globally(ComparatorT comparator) {
- return Combine.<T, T>globally(Min.of(comparator));
+ return Combine.<T, T>globally(Min.<T, ComparatorT>of(comparator));
}
/**
@@ -207,7 +207,7 @@ public class Min {
*/
public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
- return Combine.<K, T, T>perKey(Min.of(comparator));
+ return Combine.<K, T, T>perKey(Min.<T, ComparatorT>of(comparator));
}
/////////////////////////////////////////////////////////////////////////////
[2/7] beam git commit: [BEAM-1223] Reduced [Sum|Min|Max|Mean]*Fn
viability so as to make them an internal implementation detail and prevent
external code from employing them for tasks such as pipeline translation.
Posted by lc...@apache.org.
[BEAM-1223] Reduced [Sum|Min|Max|Mean]*Fn viability so as to make them an internal implementation detail and prevent external code from employing them for tasks such as pipeline translation.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/78a360ea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/78a360ea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/78a360ea
Branch: refs/heads/master
Commit: 78a360eac35507d9a558fc6117bb56b67b8a884e
Parents: e794f14
Author: Stas Levin <st...@gmail.com>
Authored: Sun Jan 1 13:58:32 2017 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 14:25:26 2017 -0800
----------------------------------------------------------------------
.../beam/examples/DebuggingWordCount.java | 4 +-
.../org/apache/beam/examples/WordCount.java | 2 +-
.../cookbook/CombinePerKeyExamples.java | 2 +-
.../beam/examples/complete/game/GameStats.java | 2 +-
.../beam/examples/complete/game/UserScore.java | 2 +-
.../runners/apex/examples/WordCountTest.java | 2 +-
.../utils/ApexStateInternalsTest.java | 2 +-
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 4 +-
.../runners/core/GroupAlsoByWindowsDoFn.java | 4 +-
.../apache/beam/runners/core/NonEmptyPanes.java | 2 +-
.../AfterDelayFromFirstElementStateMachine.java | 2 +-
.../core/triggers/AfterPaneStateMachine.java | 2 +-
.../core/LateDataDroppingDoFnRunnerTest.java | 2 +-
.../beam/runners/core/ReduceFnRunnerTest.java | 12 +-
.../beam/runners/core/ReduceFnTester.java | 2 +-
.../GroupAlsoByWindowEvaluatorFactory.java | 4 +-
.../runners/direct/AggregatorContainerTest.java | 16 +--
.../CopyOnAccessInMemoryStateInternalsTest.java | 4 +-
.../runners/direct/EvaluationContextTest.java | 6 +-
.../beam/runners/flink/examples/WordCount.java | 2 +-
.../flink/examples/streaming/AutoComplete.java | 2 +-
.../KafkaWindowedWordCountExample.java | 2 +-
.../examples/streaming/WindowedWordCount.java | 2 +-
.../streaming/FlinkStateInternalsTest.java | 2 +-
.../dataflow/DataflowPipelineJobTest.java | 8 +-
.../spark/aggregators/NamedAggregators.java | 4 +-
.../beam/runners/spark/examples/WordCount.java | 2 +-
.../translation/SparkGroupAlsoByWindowFn.java | 2 +-
.../spark/translation/SparkRuntimeContext.java | 63 ++++-------
.../ResumeFromCheckpointStreamingTest.java | 2 +-
.../streaming/utils/PAssertStreaming.java | 4 +-
.../apache/beam/sdk/io/PubsubUnboundedSink.java | 8 +-
.../beam/sdk/io/PubsubUnboundedSource.java | 5 +-
.../org/apache/beam/sdk/testing/PAssert.java | 12 +-
.../org/apache/beam/sdk/transforms/Max.java | 112 +++++++++++--------
.../org/apache/beam/sdk/transforms/Mean.java | 13 ++-
.../org/apache/beam/sdk/transforms/Min.java | 110 ++++++++++--------
.../org/apache/beam/sdk/transforms/Sum.java | 57 ++++++----
.../windowing/AfterDelayFromFirstElement.java | 2 +-
.../sdk/transforms/windowing/AfterPane.java | 2 +-
.../sdk/AggregatorPipelineExtractorTest.java | 16 +--
.../beam/sdk/transforms/CombineFnsTest.java | 20 ++--
.../apache/beam/sdk/transforms/DoFnTest.java | 15 ++-
.../beam/sdk/transforms/DoFnTesterTest.java | 6 +-
.../org/apache/beam/sdk/transforms/MaxTest.java | 6 +-
.../apache/beam/sdk/transforms/MeanTest.java | 2 +-
.../org/apache/beam/sdk/transforms/MinTest.java | 6 +-
.../beam/sdk/transforms/OldDoFnContextTest.java | 2 +-
.../apache/beam/sdk/transforms/OldDoFnTest.java | 11 +-
.../beam/sdk/transforms/SimpleStatsFnsTest.java | 36 +++---
.../org/apache/beam/sdk/transforms/SumTest.java | 12 +-
.../apache/beam/sdk/transforms/ViewTest.java | 2 +-
.../apache/beam/sdk/util/CombineFnUtilTest.java | 8 +-
.../util/state/InMemoryStateInternalsTest.java | 2 +-
.../beam/sdk/util/state/StateTagTest.java | 11 +-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 2 +-
56 files changed, 339 insertions(+), 310 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index f7c537c..997d590 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -95,9 +95,9 @@ public class DebuggingWordCount {
* in a dashboard, etc.
*/
private final Aggregator<Long, Long> matchedWords =
- createAggregator("matchedWords", new Sum.SumLongFn());
+ createAggregator("matchedWords", Sum.ofLongs());
private final Aggregator<Long, Long> unmatchedWords =
- createAggregator("umatchedWords", new Sum.SumLongFn());
+ createAggregator("umatchedWords", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index 224d7db..7e21d47 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -87,7 +87,7 @@ public class WordCount {
*/
static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
+ createAggregator("emptyLines", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 29655ea..37f9d79 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -80,7 +80,7 @@ public class CombinePerKeyExamples {
*/
static class ExtractLargeWordsFn extends DoFn<TableRow, KV<String, String>> {
private final Aggregator<Long, Long> smallerWords =
- createAggregator("smallerWords", new Sum.SumLongFn());
+ createAggregator("smallerWords", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c){
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 6ad6a23..74f1b30 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -127,7 +127,7 @@ public class GameStats extends LeaderBoard {
.withSideInputs(globalMeanScore)
.of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
private final Aggregator<Long, Long> numSpammerUsers =
- createAggregator("SpammerUsers", new Sum.SumLongFn());
+ createAggregator("SpammerUsers", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
Integer score = c.element().getValue();
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index cb81a7e..7dd5a8e 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -126,7 +126,7 @@ public class UserScore {
// Log and count parse errors.
private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class);
private final Aggregator<Long, Long> numParseErrors =
- createAggregator("ParseErrors", new Sum.SumLongFn());
+ createAggregator("ParseErrors", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index 28bb8ad..a1713ac 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -68,7 +68,7 @@ public class WordCountTest {
static class ExtractWordsFn extends DoFn<String, String> {
private static final long serialVersionUID = 1L;
private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
+ createAggregator("emptyLines", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 1801358..4d797f1 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -62,7 +62,7 @@ public class ApexStateInternalsTest {
StateTags.value("stringValue", StringUtf8Coder.of());
private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
- "sumInteger", VarIntCoder.of(), new Sum.SumIntegerFn());
+ "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 14171b3..d79683a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -50,9 +50,9 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
protected final Aggregator<Long, Long> droppedDueToClosedWindow =
createAggregator(
- GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
+ GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
protected final Aggregator<Long, Long> droppedDueToLateness =
- createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
+ createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
private final WindowingStrategy<Object, W> windowingStrategy;
private final StateInternalsFactory<K> stateInternalsFactory;
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
index 1b32d84..9a2f8fd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
@@ -41,7 +41,7 @@ public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends Bound
public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
protected final Aggregator<Long, Long> droppedDueToClosedWindow =
- createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
+ createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
protected final Aggregator<Long, Long> droppedDueToLateness =
- createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
+ createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
index 3e51dfb..0a6fd93 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
@@ -121,7 +121,7 @@ public abstract class NonEmptyPanes<K, W extends BoundedWindow> {
private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
PANE_ADDITIONS_TAG =
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
- "count", VarLongCoder.of(), new Sum.SumLongFn()));
+ "count", VarLongCoder.of(), Sum.ofLongs()));
@Override
public void recordContent(StateAccessor<K> state) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index c8922df..b60c690 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -56,7 +56,7 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
protected static final StateTag<Object, AccumulatorCombiningState<Instant,
Combine.Holder<Instant>, Instant>> DELAYED_UNTIL_TAG =
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
- "delayed", InstantCoder.of(), Min.MinFn.<Instant>naturalOrder()));
+ "delayed", InstantCoder.of(), Min.<Instant>naturalOrder()));
private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH);
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
index 38b95f9..d8ad370 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
@@ -38,7 +38,7 @@ public class AfterPaneStateMachine extends OnceTriggerStateMachine {
private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
ELEMENTS_IN_PANE_TAG =
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
- "count", VarLongCoder.of(), new Sum.SumLongFn()));
+ "count", VarLongCoder.of(), Sum.ofLongs()));
private final int countElems;
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
index 3cd5d4a..efe2044 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
@@ -114,7 +114,7 @@ public class LateDataDroppingDoFnRunnerTest {
@Override
public CombineFn<Long, ?, Long> getCombineFn() {
- return new Sum.SumLongFn();
+ return Sum.ofLongs();
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 4abfc9a..1bd717f 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -217,7 +217,7 @@ public class ReduceFnRunnerTest {
ReduceFnTester.combining(
strategy,
mockTriggerStateMachine,
- new Sum.SumIntegerFn().<String>asKeyedFn(),
+ Sum.ofIntegers().<String>asKeyedFn(),
VarIntCoder.of());
injectElement(tester, 2);
@@ -290,7 +290,7 @@ public class ReduceFnRunnerTest {
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
ReduceFnTester
- .combining(strategy, new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
+ .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
tester.injectElements(TimestampedValue.of(13, elementTimestamp));
@@ -322,7 +322,7 @@ public class ReduceFnRunnerTest {
ReduceFnTester.combining(
strategy,
mockTriggerStateMachine,
- new Sum.SumIntegerFn().<String>asKeyedFn(),
+ Sum.ofIntegers().<String>asKeyedFn(),
VarIntCoder.of());
injectElement(tester, 1);
@@ -1069,7 +1069,7 @@ public class ReduceFnRunnerTest {
SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30)))
.withTrigger(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.millis(1000)),
- new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
+ Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
tester.injectElements(
// assigned to [-60, 40), [-30, 70), [0, 100)
@@ -1212,7 +1212,7 @@ public class ReduceFnRunnerTest {
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
ReduceFnTester
- .combining(strategy, new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
+ .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
tester.advanceInputWatermark(new Instant(0));
tester.advanceProcessingTime(new Instant(0));
@@ -1268,7 +1268,7 @@ public class ReduceFnRunnerTest {
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
ReduceFnTester
- .combining(strategy, new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
+ .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
tester.advanceInputWatermark(new Instant(0));
tester.advanceProcessingTime(new Instant(0));
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 890195a..226f5f0 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -638,7 +638,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
@Override
public CombineFn<Long, ?, Long> getCombineFn() {
- return new Sum.SumLongFn();
+ return Sum.ofLongs();
}
public long getSum() {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index bb11923..2bc0d8d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -147,10 +147,10 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
reduceFn = SystemReduceFn.buffering(valueCoder);
droppedDueToClosedWindow = aggregatorChanges.createSystemAggregator(stepContext,
GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
- new Sum.SumLongFn());
+ Sum.ofLongs());
droppedDueToLateness = aggregatorChanges.createSystemAggregator(stepContext,
GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER,
- new Sum.SumLongFn());
+ Sum.ofLongs());
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
index f770800..37524eb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
@@ -26,7 +26,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.ExecutionContext.StepContext;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum.SumIntegerFn;
+import org.apache.beam.sdk.transforms.Sum;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -61,13 +61,13 @@ public class AggregatorContainerTest {
@Test
public void addsAggregatorsOnCommit() {
AggregatorContainer.Mutator mutator = container.createMutator();
- mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn()).addValue(5);
+ mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
mutator.commit();
assertThat((Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(5));
mutator = container.createMutator();
- mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn()).addValue(8);
+ mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(8);
assertThat("Shouldn't update value until commit",
(Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(5));
@@ -81,14 +81,14 @@ public class AggregatorContainerTest {
mutator.commit();
thrown.expect(IllegalStateException.class);
- mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn()).addValue(5);
+ mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
}
@Test
public void failToAddValueAfterCommit() {
AggregatorContainer.Mutator mutator = container.createMutator();
Aggregator<Integer, ?> aggregator =
- mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn());
+ mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", Sum.ofIntegers());
mutator.commit();
thrown.expect(IllegalStateException.class);
@@ -99,12 +99,12 @@ public class AggregatorContainerTest {
public void failToAddValueAfterCommitWithPrevious() {
AggregatorContainer.Mutator mutator = container.createMutator();
mutator.createAggregatorForDoFn(
- fn, stepContext, "sum_int", new SumIntegerFn()).addValue(5);
+ fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(5);
mutator.commit();
mutator = container.createMutator();
Aggregator<Integer, ?> aggregator = mutator.createAggregatorForDoFn(
- fn, stepContext, "sum_int", new SumIntegerFn());
+ fn, stepContext, "sum_int", Sum.ofIntegers());
mutator.commit();
thrown.expect(IllegalStateException.class);
@@ -123,7 +123,7 @@ public class AggregatorContainerTest {
@Override
public void run() {
mutator.createAggregatorForDoFn(
- fn, stepContext, "sum_int", new SumIntegerFn()).addValue(value);
+ fn, stepContext, "sum_int", Sum.ofIntegers()).addValue(value);
mutator.commit();
}
});
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 12ef66c..4284f3e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -167,7 +167,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
public void testAccumulatorCombiningStateWithUnderlying() throws CannotProvideCoderException {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- CombineFn<Long, long[], Long> sumLongFn = new Sum.SumLongFn();
+ CombineFn<Long, long[], Long> sumLongFn = Sum.ofLongs();
StateNamespace namespace = new StateNamespaceForTest("foo");
CoderRegistry reg = pipeline.getCoderRegistry();
@@ -197,7 +197,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
public void testKeyedAccumulatorCombiningStateWithUnderlying() throws Exception {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- KeyedCombineFn<String, Long, long[], Long> sumLongFn = new Sum.SumLongFn().asKeyedFn();
+ KeyedCombineFn<String, Long, long[], Long> sumLongFn = Sum.ofLongs().asKeyedFn();
StateNamespace namespace = new StateNamespaceForTest("foo");
CoderRegistry reg = pipeline.getCoderRegistry();
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 15340da..ad6e32d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -45,7 +45,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Sum.SumLongFn;
+import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -250,7 +250,7 @@ public class EvaluationContextTest {
"STEP", createdProducer.getTransform().getName());
AggregatorContainer container = context.getAggregatorContainer();
AggregatorContainer.Mutator mutator = container.createMutator();
- mutator.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(4L);
+ mutator.createAggregatorForDoFn(fn, stepContext, "foo", Sum.ofLongs()).addValue(4L);
TransformResult<?> result =
StepTransformResult.withoutHold(createdProducer)
@@ -260,7 +260,7 @@ public class EvaluationContextTest {
assertThat((Long) context.getAggregatorContainer().getAggregate("STEP", "foo"), equalTo(4L));
AggregatorContainer.Mutator mutatorAgain = container.createMutator();
- mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(12L);
+ mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", Sum.ofLongs()).addValue(12L);
TransformResult<?> secondResult =
StepTransformResult.withoutHold(downstreamProducer)
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index b6b3c1a..6ae4cf8 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -46,7 +46,7 @@ public class WordCount {
*/
public static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
+ createAggregator("emptyLines", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index 3405981..f33e616 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -314,7 +314,7 @@ public class AutoComplete {
static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
+ createAggregator("emptyLines", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index 42c42f3..ee0e874 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -55,7 +55,7 @@ public class KafkaWindowedWordCountExample {
*/
public static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
+ createAggregator("emptyLines", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
index 2246bdd..792c214 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
@@ -66,7 +66,7 @@ public class WindowedWordCount {
static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
+ createAggregator("emptyLines", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 628212a..126f611 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -70,7 +70,7 @@ public class FlinkStateInternalsTest {
StateTags.value("stringValue", StringUtf8Coder.of());
private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
- "sumInteger", VarIntCoder.of(), new Sum.SumIntegerFn());
+ "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 1890da1..6999e03 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -420,7 +420,7 @@ public class DataflowPipelineJobTest {
@Test
public void testGetAggregatorValuesWithSingleMetricUpdateReturnsSingletonCollection()
throws IOException, AggregatorRetrievalException {
- CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+ CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
String aggregatorName = "agg";
Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
@SuppressWarnings("unchecked")
@@ -468,7 +468,7 @@ public class DataflowPipelineJobTest {
@Test
public void testGetAggregatorValuesWithMultipleMetricUpdatesReturnsCollection()
throws IOException, AggregatorRetrievalException {
- CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+ CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
String aggregatorName = "agg";
Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
@@ -536,7 +536,7 @@ public class DataflowPipelineJobTest {
@Test
public void testGetAggregatorValuesWithUnrelatedMetricUpdateIgnoresUpdate()
throws IOException, AggregatorRetrievalException {
- CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+ CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
String aggregatorName = "agg";
Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
@SuppressWarnings("unchecked")
@@ -600,7 +600,7 @@ public class DataflowPipelineJobTest {
@Test
public void testGetAggregatorValuesWhenClientThrowsExceptionThrowsAggregatorRetrievalException()
throws IOException, AggregatorRetrievalException {
- CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+ CombineFn<Long, long[], Long> combineFn = Sum.ofLongs();
String aggregatorName = "agg";
Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index 4e96466..52fe994 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -153,7 +153,9 @@ public class NamedAggregators implements Serializable {
}
/**
- * => combineFunction in data flow.
+ * @param <InputT> Input data type
+ * @param <InterT> Intermediate data type (useful for averages)
+ * @param <OutputT> Output data type
*/
public static class CombineFunctionState<InputT, InterT, OutputT>
implements State<InputT, InterT, OutputT> {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index 1252d12..da14ee2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -46,7 +46,7 @@ public class WordCount {
*/
static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
+ createAggregator("emptyLines", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
index 5432d58..b615132 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
@@ -78,7 +78,7 @@ public class SparkGroupAlsoByWindowFn<K, InputT, W extends BoundedWindow>
droppedDueToClosedWindow = runtimeContext.createAggregator(
accumulator,
GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
- new Sum.SumLongFn());
+ Sum.ofLongs());
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 01b6b54..9c3d79f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -32,10 +32,6 @@ import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.spark.Accumulator;
/**
@@ -92,18 +88,26 @@ public class SparkRuntimeContext implements Serializable {
Combine.CombineFn<? super InputT, InterT, OutputT> combineFn) {
@SuppressWarnings("unchecked")
Aggregator<InputT, OutputT> aggregator = (Aggregator<InputT, OutputT>) aggregators.get(named);
- if (aggregator == null) {
- @SuppressWarnings("unchecked")
- NamedAggregators.CombineFunctionState<InputT, InterT, OutputT> state =
- new NamedAggregators.CombineFunctionState<>(
- (Combine.CombineFn<InputT, InterT, OutputT>) combineFn,
- (Coder<InputT>) getCoder(combineFn),
- this);
- accum.add(new NamedAggregators(named, state));
- aggregator = new SparkAggregator<>(named, state);
- aggregators.put(named, aggregator);
+ try {
+ if (aggregator == null) {
+ @SuppressWarnings("unchecked")
+ final
+ NamedAggregators.CombineFunctionState<InputT, InterT, OutputT> state =
+ new NamedAggregators.CombineFunctionState<>(
+ (Combine.CombineFn<InputT, InterT, OutputT>) combineFn,
+ // hidden assumption: InputT == OutputT
+ (Coder<InputT>) getCoderRegistry().getCoder(combineFn.getOutputType()),
+ this);
+
+ accum.add(new NamedAggregators(named, state));
+ aggregator = new SparkAggregator<>(named, state);
+ aggregators.put(named, aggregator);
+ }
+ return aggregator;
+ } catch (CannotProvideCoderException e) {
+ throw new RuntimeException(String.format("Unable to create an aggregator named: [%s]", named),
+ e);
}
- return aggregator;
}
public CoderRegistry getCoderRegistry() {
@@ -114,35 +118,6 @@ public class SparkRuntimeContext implements Serializable {
return coderRegistry;
}
- private Coder<?> getCoder(Combine.CombineFn<?, ?, ?> combiner) {
- try {
- if (combiner.getClass() == Sum.SumIntegerFn.class) {
- return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
- } else if (combiner.getClass() == Sum.SumLongFn.class) {
- return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
- } else if (combiner.getClass() == Sum.SumDoubleFn.class) {
- return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
- } else if (combiner.getClass() == Min.MinIntegerFn.class) {
- return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
- } else if (combiner.getClass() == Min.MinLongFn.class) {
- return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
- } else if (combiner.getClass() == Min.MinDoubleFn.class) {
- return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
- } else if (combiner.getClass() == Max.MaxIntegerFn.class) {
- return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
- } else if (combiner.getClass() == Max.MaxLongFn.class) {
- return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
- } else if (combiner.getClass() == Max.MaxDoubleFn.class) {
- return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
- } else {
- throw new IllegalArgumentException("unsupported combiner in Aggregator: "
- + combiner.getClass().getName());
- }
- } catch (CannotProvideCoderException e) {
- throw new IllegalStateException("Could not determine default coder for combiner", e);
- }
- }
-
/**
* Initialize spark aggregators exactly once.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 2718b5f..ab04c5c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -182,7 +182,7 @@ public class ResumeFromCheckpointStreamingTest {
private static class FormatAsText extends DoFn<KV<String, String>, String> {
private final Aggregator<Long, Long> aggregator =
- createAggregator("processedMessages", new Sum.SumLongFn());
+ createAggregator("processedMessages", Sum.ofLongs());
@ProcessElement
public void process(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
index 0284b3d..cd9de92 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
@@ -97,9 +97,9 @@ public final class PAssertStreaming implements Serializable {
private static class AssertDoFn<T> extends DoFn<Iterable<T>, Void> {
private final Aggregator<Integer, Integer> success =
- createAggregator(PAssert.SUCCESS_COUNTER, new Sum.SumIntegerFn());
+ createAggregator(PAssert.SUCCESS_COUNTER, Sum.ofIntegers());
private final Aggregator<Integer, Integer> failure =
- createAggregator(PAssert.FAILURE_COUNTER, new Sum.SumIntegerFn());
+ createAggregator(PAssert.FAILURE_COUNTER, Sum.ofIntegers());
private final T[] expected;
AssertDoFn(T[] expected) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 58414c6..75f6b7d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -154,7 +154,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
*/
private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>> {
private final Aggregator<Long, Long> elementCounter =
- createAggregator("elements", new Sum.SumLongFn());
+ createAggregator("elements", Sum.ofLongs());
private final Coder<T> elementCoder;
private final int numShards;
private final RecordIdMethod recordIdMethod;
@@ -219,11 +219,11 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
private transient PubsubClient pubsubClient;
private final Aggregator<Long, Long> batchCounter =
- createAggregator("batches", new Sum.SumLongFn());
+ createAggregator("batches", Sum.ofLongs());
private final Aggregator<Long, Long> elementCounter =
- createAggregator("elements", new Sum.SumLongFn());
+ createAggregator("elements", Sum.ofLongs());
private final Aggregator<Long, Long> byteCounter =
- createAggregator("bytes", new Sum.SumLongFn());
+ createAggregator("bytes", Sum.ofLongs());
WriterFn(
PubsubClientFactory pubsubFactory, ValueProvider<TopicPath> topic,
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index da3b437..4b3792d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -59,7 +59,6 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.Sum.SumLongFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -216,7 +215,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
}
};
- private static final Combine.BinaryCombineLongFn SUM = new SumLongFn();
+ private static final Combine.BinaryCombineLongFn SUM = Sum.ofLongs();
// ================================================================================
// Checkpoint
@@ -1159,7 +1158,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
private static class StatsFn<T> extends DoFn<T, T> {
private final Aggregator<Long, Long> elementCounter =
- createAggregator("elements", new Sum.SumLongFn());
+ createAggregator("elements", Sum.ofLongs());
private final PubsubClientFactory pubsubFactory;
private final ValueProvider<SubscriptionPath> subscription;
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index b23f4f3..b57f4a9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -1019,9 +1019,9 @@ public class PAssert {
private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> {
private final SerializableFunction<ActualT, Void> checkerFn;
private final Aggregator<Integer, Integer> success =
- createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+ createAggregator(SUCCESS_COUNTER, Sum.ofIntegers());
private final Aggregator<Integer, Integer> failure =
- createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+ createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
private final PCollectionView<ActualT> actual;
private SideInputCheckerDoFn(
@@ -1054,9 +1054,9 @@ public class PAssert {
private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> {
private final SerializableFunction<ActualT, Void> checkerFn;
private final Aggregator<Integer, Integer> success =
- createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+ createAggregator(SUCCESS_COUNTER, Sum.ofIntegers());
private final Aggregator<Integer, Integer> failure =
- createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+ createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
private GroupedValuesCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
this.checkerFn = checkerFn;
@@ -1079,9 +1079,9 @@ public class PAssert {
private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> {
private final SerializableFunction<ActualT, Void> checkerFn;
private final Aggregator<Integer, Integer> success =
- createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+ createAggregator(SUCCESS_COUNTER, Sum.ofIntegers());
private final Aggregator<Integer, Integer> failure =
- createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+ createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
private SingletonCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
this.checkerFn = checkerFn;
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index 0990ca4..696bed9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -110,13 +110,69 @@ public class Max {
}
/**
+ * A {@code CombineFn} that computes the maximum of a collection of {@code Integer}s, useful as an
+ * argument to {@link Combine#globally} or {@link Combine#perKey}.
+ */
+ public static Combine.BinaryCombineIntegerFn ofIntegers() {
+ return new Max.MaxIntegerFn();
+ }
+
+ /**
+ * A {@code CombineFn} that computes the maximum of a collection of {@code Long}s, useful as an
+ * argument to {@link Combine#globally} or {@link Combine#perKey}.
+ */
+ public static Combine.BinaryCombineLongFn ofLongs() {
+ return new Max.MaxLongFn();
+ }
+
+ /**
+ * A {@code CombineFn} that computes the maximum of a collection of {@code Double}s, useful as an
+ * argument to {@link Combine#globally} or {@link Combine#perKey}.
+ */
+ public static Combine.BinaryCombineDoubleFn ofDoubles() {
+ return new Max.MaxDoubleFn();
+ }
+
+ /**
+ * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+ * using an arbitrary {@link Comparator} and {@code identity},
+ * useful as an argument to {@link Combine#globally} or {@link Combine#perKey}.
+ *
+ * @param <T> the type of the values being compared
+ */
+ public static <T, ComparatorT extends Comparator<? super T> & Serializable>
+ BinaryCombineFn<T> of(final T identity, final ComparatorT comparator) {
+ return new MaxFn<T>(identity, comparator);
+ }
+
+ /**
+ * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+ * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
+ * {@link Combine#perKey}.
+ *
+ * @param <T> the type of the values being compared
+ */
+ public static <T, ComparatorT extends Comparator<? super T> & Serializable>
+ BinaryCombineFn<T> of(final ComparatorT comparator) {
+ return new MaxFn<T>(null, comparator);
+ }
+
+ public static <T extends Comparable<? super T>> BinaryCombineFn<T> naturalOrder(T identity) {
+ return new MaxFn<T>(identity, new Top.Largest<T>());
+ }
+
+ public static <T extends Comparable<? super T>> BinaryCombineFn<T> naturalOrder() {
+ return new MaxFn<T>(null, new Top.Largest<T>());
+ }
+
+ /**
* Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
* PCollection<T>} whose contents is the maximum according to the natural ordering of {@code T}
* of the input {@code PCollection}'s elements, or {@code null} if there are no elements.
*/
public static <T extends Comparable<? super T>>
Combine.Globally<T, T> globally() {
- return Combine.<T, T>globally(MaxFn.<T>naturalOrder());
+ return Combine.<T, T>globally(Max.<T>naturalOrder());
}
/**
@@ -129,7 +185,7 @@ public class Max {
*/
public static <K, T extends Comparable<? super T>>
Combine.PerKey<K, T, T> perKey() {
- return Combine.<K, T, T>perKey(MaxFn.<T>naturalOrder());
+ return Combine.<K, T, T>perKey(Max.<T>naturalOrder());
}
/**
@@ -139,7 +195,7 @@ public class Max {
*/
public static <T, ComparatorT extends Comparator<? super T> & Serializable>
Combine.Globally<T, T> globally(ComparatorT comparator) {
- return Combine.<T, T>globally(MaxFn.of(comparator));
+ return Combine.<T, T>globally(Max.of(comparator));
}
/**
@@ -151,19 +207,12 @@ public class Max {
*/
public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
- return Combine.<K, T, T>perKey(MaxFn.of(comparator));
+ return Combine.<K, T, T>perKey(Max.of(comparator));
}
/////////////////////////////////////////////////////////////////////////////
- /**
- * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
- * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
- * {@link Combine#perKey}.
- *
- * @param <T> the type of the values being compared
- */
- public static class MaxFn<T> extends BinaryCombineFn<T> {
+ private static class MaxFn<T> extends BinaryCombineFn<T> {
private final T identity;
private final Comparator<? super T> comparator;
@@ -174,24 +223,6 @@ public class Max {
this.comparator = comparator;
}
- public static <T, ComparatorT extends Comparator<? super T> & Serializable>
- MaxFn<T> of(T identity, ComparatorT comparator) {
- return new MaxFn<T>(identity, comparator);
- }
-
- public static <T, ComparatorT extends Comparator<? super T> & Serializable>
- MaxFn<T> of(ComparatorT comparator) {
- return new MaxFn<T>(null, comparator);
- }
-
- public static <T extends Comparable<? super T>> MaxFn<T> naturalOrder(T identity) {
- return new MaxFn<T>(identity, new Top.Largest<T>());
- }
-
- public static <T extends Comparable<? super T>> MaxFn<T> naturalOrder() {
- return new MaxFn<T>(null, new Top.Largest<T>());
- }
-
@Override
public T identity() {
return identity;
@@ -210,11 +241,8 @@ public class Max {
}
}
- /**
- * A {@code CombineFn} that computes the maximum of a collection of {@code Integer}s, useful as an
- * argument to {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn {
+ private static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn {
+
@Override
public int apply(int left, int right) {
return left >= right ? left : right;
@@ -226,11 +254,8 @@ public class Max {
}
}
- /**
- * A {@code CombineFn} that computes the maximum of a collection of {@code Long}s, useful as an
- * argument to {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class MaxLongFn extends Combine.BinaryCombineLongFn {
+ private static class MaxLongFn extends Combine.BinaryCombineLongFn {
+
@Override
public long apply(long left, long right) {
return left >= right ? left : right;
@@ -242,11 +267,8 @@ public class Max {
}
}
- /**
- * A {@code CombineFn} that computes the maximum of a collection of {@code Double}s, useful as an
- * argument to {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn {
+ private static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn {
+
@Override
public double apply(double left, double right) {
return left >= right ? left : right;
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index cb77ba3..7e62e9d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -84,9 +84,6 @@ public class Mean {
return Combine.<K, NumT, Double>perKey(new MeanFn<>());
}
-
- /////////////////////////////////////////////////////////////////////////////
-
/**
* A {@code Combine.CombineFn} that computes the arithmetic mean
* (a.k.a. average) of an {@code Iterable} of numbers of type
@@ -97,13 +94,19 @@ public class Mean {
*
* @param <NumT> the type of the {@code Number}s being combined
*/
- static class MeanFn<NumT extends Number>
+ public static <NumT extends Number>
+ Combine.AccumulatingCombineFn<NumT, CountSum<NumT>, Double> of() {
+ return new MeanFn<>();
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ private static class MeanFn<NumT extends Number>
extends Combine.AccumulatingCombineFn<NumT, CountSum<NumT>, Double> {
/**
* Constructs a combining function that computes the mean over
* a collection of values of type {@code N}.
*/
- public MeanFn() {}
@Override
public CountSum<NumT> createAccumulator() {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index 5003594..b208929 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -110,13 +110,69 @@ public class Min {
}
/**
+ * A {@code CombineFn} that computes the minimum of a collection of {@code Integer}s, useful as an
+ * argument to {@link Combine#globally} or {@link Combine#perKey}.
+ */
+ public static Combine.BinaryCombineIntegerFn ofIntegers() {
+ return new Min.MinIntegerFn();
+ }
+
+ /**
+ * A {@code CombineFn} that computes the minimum of a collection of {@code Long}s, useful as an
+ * argument to {@link Combine#globally} or {@link Combine#perKey}.
+ */
+ public static Combine.BinaryCombineLongFn ofLongs() {
+ return new Min.MinLongFn();
+ }
+
+ /**
+ * A {@code CombineFn} that computes the minimum of a collection of {@code Double}s, useful as an
+ * argument to {@link Combine#globally} or {@link Combine#perKey}.
+ */
+ public static Combine.BinaryCombineDoubleFn ofDoubles() {
+ return new Min.MinDoubleFn();
+ }
+
+ /**
+ * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+ * using an arbitrary {@link Comparator} and an {@code identity},
+ * useful as an argument to {@link Combine#globally} or {@link Combine#perKey}.
+ *
+ * @param <T> the type of the values being compared
+ */
+ public static <T, ComparatorT extends Comparator<? super T> & Serializable>
+ BinaryCombineFn<T> of(T identity, ComparatorT comparator) {
+ return new MinFn<T>(identity, comparator);
+ }
+
+ /**
+ * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+ * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
+ * {@link Combine#perKey}.
+ *
+ * @param <T> the type of the values being compared
+ */
+ public static <T, ComparatorT extends Comparator<? super T> & Serializable>
+ BinaryCombineFn<T> of(ComparatorT comparator) {
+ return new MinFn<T>(null, comparator);
+ }
+
+ public static <T extends Comparable<? super T>> BinaryCombineFn<T> naturalOrder(T identity) {
+ return new MinFn<T>(identity, new Top.Largest<T>());
+ }
+
+ public static <T extends Comparable<? super T>> BinaryCombineFn<T> naturalOrder() {
+ return new MinFn<T>(null, new Top.Largest<T>());
+ }
+
+ /**
* Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
* PCollection<T>} whose contents is the minimum according to the natural ordering of {@code T}
* of the input {@code PCollection}'s elements, or {@code null} if there are no elements.
*/
public static <T extends Comparable<? super T>>
Combine.Globally<T, T> globally() {
- return Combine.<T, T>globally(MinFn.<T>naturalOrder());
+ return Combine.<T, T>globally(Min.<T>naturalOrder());
}
/**
@@ -129,7 +185,7 @@ public class Min {
*/
public static <K, T extends Comparable<? super T>>
Combine.PerKey<K, T, T> perKey() {
- return Combine.<K, T, T>perKey(MinFn.<T>naturalOrder());
+ return Combine.<K, T, T>perKey(Min.<T>naturalOrder());
}
/**
@@ -139,7 +195,7 @@ public class Min {
*/
public static <T, ComparatorT extends Comparator<? super T> & Serializable>
Combine.Globally<T, T> globally(ComparatorT comparator) {
- return Combine.<T, T>globally(MinFn.of(comparator));
+ return Combine.<T, T>globally(Min.of(comparator));
}
/**
@@ -151,19 +207,12 @@ public class Min {
*/
public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
- return Combine.<K, T, T>perKey(MinFn.of(comparator));
+ return Combine.<K, T, T>perKey(Min.of(comparator));
}
/////////////////////////////////////////////////////////////////////////////
- /**
- * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
- * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
- * {@link Combine#perKey}.
- *
- * @param <T> the type of the values being compared
- */
- public static class MinFn<T> extends BinaryCombineFn<T> {
+ private static class MinFn<T> extends BinaryCombineFn<T> {
private final T identity;
private final Comparator<? super T> comparator;
@@ -174,24 +223,6 @@ public class Min {
this.comparator = comparator;
}
- public static <T, ComparatorT extends Comparator<? super T> & Serializable>
- MinFn<T> of(T identity, ComparatorT comparator) {
- return new MinFn<T>(identity, comparator);
- }
-
- public static <T, ComparatorT extends Comparator<? super T> & Serializable>
- MinFn<T> of(ComparatorT comparator) {
- return new MinFn<T>(null, comparator);
- }
-
- public static <T extends Comparable<? super T>> MinFn<T> naturalOrder(T identity) {
- return new MinFn<T>(identity, new Top.Largest<T>());
- }
-
- public static <T extends Comparable<? super T>> MinFn<T> naturalOrder() {
- return new MinFn<T>(null, new Top.Largest<T>());
- }
-
@Override
public T identity() {
return identity;
@@ -210,11 +241,7 @@ public class Min {
}
}
- /**
- * A {@code CombineFn} that computes the minimum of a collection of {@code Integer}s, useful as an
- * argument to {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class MinIntegerFn extends Combine.BinaryCombineIntegerFn {
+ private static class MinIntegerFn extends Combine.BinaryCombineIntegerFn {
@Override
public int apply(int left, int right) {
@@ -227,11 +254,8 @@ public class Min {
}
}
- /**
- * A {@code CombineFn} that computes the minimum of a collection of {@code Long}s, useful as an
- * argument to {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class MinLongFn extends Combine.BinaryCombineLongFn {
+ private static class MinLongFn extends Combine.BinaryCombineLongFn {
+
@Override
public long apply(long left, long right) {
return left <= right ? left : right;
@@ -243,11 +267,7 @@ public class Min {
}
}
- /**
- * A {@code CombineFn} that computes the minimum of a collection of {@code Double}s, useful as an
- * argument to {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class MinDoubleFn extends Combine.BinaryCombineDoubleFn {
+ private static class MinDoubleFn extends Combine.BinaryCombineDoubleFn {
@Override
public double apply(double left, double right) {
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
index 48eafc3..ccade4d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
@@ -50,7 +50,7 @@ public class Sum {
* {@code 0} if there are no elements.
*/
public static Combine.Globally<Integer, Integer> integersGlobally() {
- return Combine.globally(new SumIntegerFn());
+ return Combine.globally(Sum.ofIntegers());
}
/**
@@ -62,7 +62,7 @@ public class Sum {
* that key in the input {@code PCollection}.
*/
public static <K> Combine.PerKey<K, Integer, Integer> integersPerKey() {
- return Combine.<K, Integer, Integer>perKey(new SumIntegerFn());
+ return Combine.<K, Integer, Integer>perKey(Sum.ofIntegers());
}
/**
@@ -73,7 +73,7 @@ public class Sum {
* {@code 0} if there are no elements.
*/
public static Combine.Globally<Long, Long> longsGlobally() {
- return Combine.globally(new SumLongFn());
+ return Combine.globally(Sum.ofLongs());
}
/**
@@ -85,7 +85,7 @@ public class Sum {
* that key in the input {@code PCollection}.
*/
public static <K> Combine.PerKey<K, Long, Long> longsPerKey() {
- return Combine.<K, Long, Long>perKey(new SumLongFn());
+ return Combine.<K, Long, Long>perKey(Sum.ofLongs());
}
/**
@@ -96,7 +96,7 @@ public class Sum {
* {@code 0} if there are no elements.
*/
public static Combine.Globally<Double, Double> doublesGlobally() {
- return Combine.globally(new SumDoubleFn());
+ return Combine.globally(Sum.ofDoubles());
}
/**
@@ -108,18 +108,40 @@ public class Sum {
* that key in the input {@code PCollection}.
*/
public static <K> Combine.PerKey<K, Double, Double> doublesPerKey() {
- return Combine.<K, Double, Double>perKey(new SumDoubleFn());
+ return Combine.<K, Double, Double>perKey(Sum.ofDoubles());
}
+ /**
+ * A {@code SerializableFunction} that computes the sum of an
+ * {@code Iterable} of {@code Integer}s, useful as an argument to
+ * {@link Combine#globally} or {@link Combine#perKey}.
+ */
+ public static Combine.BinaryCombineIntegerFn ofIntegers() {
+ return new SumIntegerFn();
+ }
- /////////////////////////////////////////////////////////////////////////////
+ /**
+ * A {@code SerializableFunction} that computes the sum of an
+ * {@code Iterable} of {@code Double}s, useful as an argument to
+ * {@link Combine#globally} or {@link Combine#perKey}.
+ */
+ public static Combine.BinaryCombineDoubleFn ofDoubles() {
+ return new SumDoubleFn();
+ }
/**
* A {@code SerializableFunction} that computes the sum of an
- * {@code Iterable} of {@code Integer}s, useful as an argument to
+ * {@code Iterable} of {@code Long}s, useful as an argument to
* {@link Combine#globally} or {@link Combine#perKey}.
*/
- public static class SumIntegerFn extends Combine.BinaryCombineIntegerFn {
+ public static Combine.BinaryCombineLongFn ofLongs() {
+ return new SumLongFn();
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ private static class SumIntegerFn extends Combine.BinaryCombineIntegerFn {
+
@Override
public int apply(int a, int b) {
return a + b;
@@ -131,13 +153,8 @@ public class Sum {
}
}
- /**
- * A {@code SerializableFunction} that computes the sum of an
- * {@code Iterable} of {@code Long}s, useful as an argument to
- * {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class SumLongFn
- extends Combine.BinaryCombineLongFn {
+ private static class SumLongFn extends Combine.BinaryCombineLongFn {
+
@Override
public long apply(long a, long b) {
return a + b;
@@ -149,12 +166,8 @@ public class Sum {
}
}
- /**
- * A {@code SerializableFunction} that computes the sum of an
- * {@code Iterable} of {@code Double}s, useful as an argument to
- * {@link Combine#globally} or {@link Combine#perKey}.
- */
- public static class SumDoubleFn extends Combine.BinaryCombineDoubleFn {
+ private static class SumDoubleFn extends Combine.BinaryCombineDoubleFn {
+
@Override
public double apply(double a, double b) {
return a + b;
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
index 9daecb2..6392fb5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
@@ -51,7 +51,7 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger {
protected static final StateTag<Object, AccumulatorCombiningState<Instant,
Combine.Holder<Instant>, Instant>> DELAYED_UNTIL_TAG =
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
- "delayed", InstantCoder.of(), Min.MinFn.<Instant>naturalOrder()));
+ "delayed", InstantCoder.of(), Min.<Instant>naturalOrder()));
private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH);
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
index 4a706e6..d66e1c4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
@@ -37,7 +37,7 @@ public class AfterPane extends OnceTrigger {
private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
ELEMENTS_IN_PANE_TAG =
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
- "count", VarLongCoder.of(), new Sum.SumLongFn()));
+ "count", VarLongCoder.of(), Sum.ofLongs()));
private final int countElems;
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
index 1bf2c3d..22efd85 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
@@ -70,8 +70,8 @@ public class AggregatorPipelineExtractorTest {
AggregatorProvidingDoFn<ThreadGroup, StrictMath> fn = new AggregatorProvidingDoFn<>();
when(bound.getFn()).thenReturn(fn);
- Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
- Aggregator<Integer, Integer> aggregatorTwo = fn.addAggregator(new Min.MinIntegerFn());
+ Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Sum.ofLongs());
+ Aggregator<Integer, Integer> aggregatorTwo = fn.addAggregator(Min.ofIntegers());
TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
when(transformNode.getTransform()).thenReturn(bound);
@@ -98,8 +98,8 @@ public class AggregatorPipelineExtractorTest {
AggregatorProvidingDoFn<Object, Void> fn = new AggregatorProvidingDoFn<>();
when(bound.getFn()).thenReturn(fn);
- Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Max.MaxLongFn());
- Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
+ Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Max.ofLongs());
+ Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(Min.ofDoubles());
TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
when(transformNode.getTransform()).thenReturn(bound);
@@ -129,8 +129,8 @@ public class AggregatorPipelineExtractorTest {
when(bound.getFn()).thenReturn(fn);
when(otherBound.getFn()).thenReturn(fn);
- Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
- Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
+ Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Sum.ofLongs());
+ Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(Min.ofDoubles());
TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
when(transformNode.getTransform()).thenReturn(bound);
@@ -160,7 +160,7 @@ public class AggregatorPipelineExtractorTest {
ParDo.Bound bound = mock(ParDo.Bound.class, "Bound");
AggregatorProvidingDoFn<ThreadGroup, Void> fn = new AggregatorProvidingDoFn<>();
- Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
+ Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Sum.ofLongs());
when(bound.getFn()).thenReturn(fn);
@@ -168,7 +168,7 @@ public class AggregatorPipelineExtractorTest {
ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound");
AggregatorProvidingDoFn<Long, Long> otherFn = new AggregatorProvidingDoFn<>();
- Aggregator<Double, Double> aggregatorTwo = otherFn.addAggregator(new Sum.SumDoubleFn());
+ Aggregator<Double, Double> aggregatorTwo = otherFn.addAggregator(Sum.ofDoubles());
when(otherBound.getFn()).thenReturn(otherFn);
http://git-wip-us.apache.org/repos/asf/beam/blob/78a360ea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index cdd4707..4d35e53 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -41,8 +41,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
import org.apache.beam.sdk.transforms.CombineFns.CoCombineResult;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
-import org.apache.beam.sdk.transforms.Min.MinIntegerFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -71,8 +69,8 @@ public class CombineFnsTest {
TupleTag<Integer> tag = new TupleTag<Integer>();
CombineFns.compose()
- .with(new GetIntegerFunction(), new MaxIntegerFn(), tag)
- .with(new GetIntegerFunction(), new MinIntegerFn(), tag);
+ .with(new GetIntegerFunction(), Max.ofIntegers(), tag)
+ .with(new GetIntegerFunction(), Min.ofIntegers(), tag);
}
@Test
@@ -82,8 +80,8 @@ public class CombineFnsTest {
TupleTag<Integer> tag = new TupleTag<Integer>();
CombineFns.composeKeyed()
- .with(new GetIntegerFunction(), new MaxIntegerFn(), tag)
- .with(new GetIntegerFunction(), new MinIntegerFn(), tag);
+ .with(new GetIntegerFunction(), Max.ofIntegers(), tag)
+ .with(new GetIntegerFunction(), Min.ofIntegers(), tag);
}
@Test
@@ -145,7 +143,7 @@ public class CombineFnsTest {
.apply(Combine.globally(CombineFns.compose()
.with(
new GetIntegerFunction(),
- new MaxIntegerFn(),
+ Max.ofIntegers(),
maxIntTag)
.with(
new GetUserStringFunction(),
@@ -159,7 +157,7 @@ public class CombineFnsTest {
.apply(Combine.perKey(CombineFns.composeKeyed()
.with(
new GetIntegerFunction(),
- new MaxIntegerFn().<String>asKeyedFn(),
+ Max.ofIntegers().<String>asKeyedFn(),
maxIntTag)
.with(
new GetUserStringFunction(),
@@ -203,7 +201,7 @@ public class CombineFnsTest {
.apply(Combine.globally(CombineFns.compose()
.with(
new GetIntegerFunction(),
- new MaxIntegerFn(),
+ Max.ofIntegers(),
maxIntTag)
.with(
new GetUserStringFunction(),
@@ -219,7 +217,7 @@ public class CombineFnsTest {
.apply(Combine.perKey(CombineFns.composeKeyed()
.with(
new GetIntegerFunction(),
- new MaxIntegerFn().<String>asKeyedFn(),
+ Max.ofIntegers().<String>asKeyedFn(),
maxIntTag)
.with(
new GetUserStringFunction(),
@@ -262,7 +260,7 @@ public class CombineFnsTest {
.apply(Combine.perKey(CombineFns.composeKeyed()
.with(
new GetIntegerFunction(),
- new MaxIntegerFn().<String>asKeyedFn(),
+ Max.ofIntegers().<String>asKeyedFn(),
maxIntTag)
.with(
new GetUserStringFunction(),
[5/7] beam git commit: [BEAM-1223] Relaxed some visibility concerns
for Sum*Fn, Min*Fn, Max*Fn and MeanFn until it's supported by Dataflow.
Posted by lc...@apache.org.
[BEAM-1223] Relaxed some visibility concerns for Sum*Fn, Min*Fn, Max*Fn and MeanFn until it's supported by Dataflow.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0697b059
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0697b059
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0697b059
Branch: refs/heads/master
Commit: 0697b05956e2d431c0ec9034e239f67eeff9c040
Parents: 488de7b
Author: Stas Levin <st...@gmail.com>
Authored: Tue Jan 3 20:59:22 2017 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 14:29:05 2017 -0800
----------------------------------------------------------------------
.../src/main/java/org/apache/beam/sdk/transforms/Max.java | 8 ++++----
.../src/main/java/org/apache/beam/sdk/transforms/Mean.java | 6 +++---
.../src/main/java/org/apache/beam/sdk/transforms/Min.java | 8 ++++----
.../src/main/java/org/apache/beam/sdk/transforms/Sum.java | 6 +++---
4 files changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0697b059/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index 91851bc..69aaaf2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -212,7 +212,7 @@ public class Max {
/////////////////////////////////////////////////////////////////////////////
- private static class MaxFn<T> extends BinaryCombineFn<T> {
+ public static class MaxFn<T> extends BinaryCombineFn<T> {
private final T identity;
private final Comparator<? super T> comparator;
@@ -241,7 +241,7 @@ public class Max {
}
}
- private static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn {
+ public static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn {
@Override
public int apply(int left, int right) {
@@ -254,7 +254,7 @@ public class Max {
}
}
- private static class MaxLongFn extends Combine.BinaryCombineLongFn {
+ public static class MaxLongFn extends Combine.BinaryCombineLongFn {
@Override
public long apply(long left, long right) {
@@ -267,7 +267,7 @@ public class Max {
}
}
- private static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn {
+ public static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn {
@Override
public double apply(double left, double right) {
http://git-wip-us.apache.org/repos/asf/beam/blob/0697b059/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index 7e62e9d..d650f07 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -64,7 +64,7 @@ public class Mean {
* @param <NumT> the type of the {@code Number}s being combined
*/
public static <NumT extends Number> Combine.Globally<NumT, Double> globally() {
- return Combine.<NumT, Double>globally(new MeanFn<>());
+ return Combine.<NumT, Double>globally(Mean.<NumT>of());
}
/**
@@ -81,7 +81,7 @@ public class Mean {
* @param <NumT> the type of the {@code Number}s being combined
*/
public static <K, NumT extends Number> Combine.PerKey<K, NumT, Double> perKey() {
- return Combine.<K, NumT, Double>perKey(new MeanFn<>());
+ return Combine.<K, NumT, Double>perKey(Mean.<NumT>of());
}
/**
@@ -101,7 +101,7 @@ public class Mean {
/////////////////////////////////////////////////////////////////////////////
- private static class MeanFn<NumT extends Number>
+ public static class MeanFn<NumT extends Number>
extends Combine.AccumulatingCombineFn<NumT, CountSum<NumT>, Double> {
/**
* Constructs a combining function that computes the mean over
http://git-wip-us.apache.org/repos/asf/beam/blob/0697b059/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index 109f4e5..ce3b64a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -212,7 +212,7 @@ public class Min {
/////////////////////////////////////////////////////////////////////////////
- private static class MinFn<T> extends BinaryCombineFn<T> {
+ public static class MinFn<T> extends BinaryCombineFn<T> {
private final T identity;
private final Comparator<? super T> comparator;
@@ -241,7 +241,7 @@ public class Min {
}
}
- private static class MinIntegerFn extends Combine.BinaryCombineIntegerFn {
+ public static class MinIntegerFn extends Combine.BinaryCombineIntegerFn {
@Override
public int apply(int left, int right) {
@@ -254,7 +254,7 @@ public class Min {
}
}
- private static class MinLongFn extends Combine.BinaryCombineLongFn {
+ public static class MinLongFn extends Combine.BinaryCombineLongFn {
@Override
public long apply(long left, long right) {
@@ -267,7 +267,7 @@ public class Min {
}
}
- private static class MinDoubleFn extends Combine.BinaryCombineDoubleFn {
+ public static class MinDoubleFn extends Combine.BinaryCombineDoubleFn {
@Override
public double apply(double left, double right) {
http://git-wip-us.apache.org/repos/asf/beam/blob/0697b059/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
index ccade4d..4e0f680 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
@@ -140,7 +140,7 @@ public class Sum {
/////////////////////////////////////////////////////////////////////////////
- private static class SumIntegerFn extends Combine.BinaryCombineIntegerFn {
+ public static class SumIntegerFn extends Combine.BinaryCombineIntegerFn {
@Override
public int apply(int a, int b) {
@@ -153,7 +153,7 @@ public class Sum {
}
}
- private static class SumLongFn extends Combine.BinaryCombineLongFn {
+ public static class SumLongFn extends Combine.BinaryCombineLongFn {
@Override
public long apply(long a, long b) {
@@ -166,7 +166,7 @@ public class Sum {
}
}
- private static class SumDoubleFn extends Combine.BinaryCombineDoubleFn {
+ public static class SumDoubleFn extends Combine.BinaryCombineDoubleFn {
@Override
public double apply(double a, double b) {
[4/7] beam git commit: [BEAM-1223] Fixed some javadoc.
Posted by lc...@apache.org.
[BEAM-1223] Fixed some javadoc.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/488de7b1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/488de7b1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/488de7b1
Branch: refs/heads/master
Commit: 488de7b1c6c973ea7f711839a436f9edc2094488
Parents: 6b68699
Author: Stas Levin <st...@gmail.com>
Authored: Tue Jan 3 20:48:42 2017 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 14:28:21 2017 -0800
----------------------------------------------------------------------
.../apache/beam/runners/spark/aggregators/NamedAggregators.java | 4 ++--
.../core/src/main/java/org/apache/beam/sdk/transforms/Min.java | 4 ++--
2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/488de7b1/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index 52fe994..b5aec32 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -153,9 +153,9 @@ public class NamedAggregators implements Serializable {
}
/**
- * @param <InputT> Input data type
+ * @param <InputT> Input data type
* @param <InterT> Intermediate data type (useful for averages)
- * @param <OutputT> Output data type
+ * @param <OutputT> Output data type
*/
public static class CombineFunctionState<InputT, InterT, OutputT>
implements State<InputT, InterT, OutputT> {
http://git-wip-us.apache.org/repos/asf/beam/blob/488de7b1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index 3518a35..109f4e5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -134,7 +134,7 @@ public class Min {
}
/**
- * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+ * A {@code CombineFn} that computes the minimum of a collection of elements of type {@code T}
* using an arbitrary {@link Comparator} and an {@code identity},
* useful as an argument to {@link Combine#globally} or {@link Combine#perKey}.
*
@@ -146,7 +146,7 @@ public class Min {
}
/**
- * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+ * A {@code CombineFn} that computes the minimum of a collection of elements of type {@code T}
* using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
* {@link Combine#perKey}.
*
[6/7] beam git commit: [BEAM-1223] Added the missing javadocs.
Posted by lc...@apache.org.
[BEAM-1223] Added the missing javadocs.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3e16e325
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3e16e325
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3e16e325
Branch: refs/heads/master
Commit: 3e16e325a6d88ac7b21bb9f79357b14dd280becd
Parents: 0697b05
Author: Stas Levin <st...@gmail.com>
Authored: Tue Jan 3 21:36:12 2017 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 14:29:06 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/transforms/Max.java | 19 +++++++++++++++++++
.../org/apache/beam/sdk/transforms/Mean.java | 10 ++++++++++
.../java/org/apache/beam/sdk/transforms/Min.java | 19 +++++++++++++++++++
.../java/org/apache/beam/sdk/transforms/Sum.java | 15 +++++++++++++++
4 files changed, 63 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3e16e325/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index 69aaaf2..f6460b6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -212,6 +212,13 @@ public class Max {
/////////////////////////////////////////////////////////////////////////////
+ /**
+ * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+ * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
+ * {@link Combine#perKey}.
+ *
+ * @param <T> the type of the values being compared
+ */
public static class MaxFn<T> extends BinaryCombineFn<T> {
private final T identity;
@@ -241,6 +248,10 @@ public class Max {
}
}
+ /**
+ * A {@code CombineFn} that computes the maximum of a collection of {@code Integer}s, useful as an
+ * argument to {@link Combine#globally} or {@link Combine#perKey}.
+ */
public static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn {
@Override
@@ -254,6 +265,10 @@ public class Max {
}
}
+ /**
+ * A {@code CombineFn} that computes the maximum of a collection of {@code Long}s, useful as an
+ * argument to {@link Combine#globally} or {@link Combine#perKey}.
+ */
public static class MaxLongFn extends Combine.BinaryCombineLongFn {
@Override
@@ -267,6 +282,10 @@ public class Max {
}
}
+ /**
+ * A {@code CombineFn} that computes the maximum of a collection of {@code Double}s, useful as an
+ * argument to {@link Combine#globally} or {@link Combine#perKey}.
+ */
public static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn {
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/3e16e325/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index d650f07..17dbe6c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -101,6 +101,16 @@ public class Mean {
/////////////////////////////////////////////////////////////////////////////
+ /**
+ * A {@code Combine.CombineFn} that computes the arithmetic mean
+ * (a.k.a. average) of an {@code Iterable} of numbers of type
+ * {@code N}, useful as an argument to {@link Combine#globally} or
+ * {@link Combine#perKey}.
+ *
+ * <p>Returns {@code Double.NaN} if combining zero elements.
+ *
+ * @param <NumT> the type of the {@code Number}s being combined
+ */
public static class MeanFn<NumT extends Number>
extends Combine.AccumulatingCombineFn<NumT, CountSum<NumT>, Double> {
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/3e16e325/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index ce3b64a..47d831c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -212,6 +212,13 @@ public class Min {
/////////////////////////////////////////////////////////////////////////////
+ /**
+ * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T}
+ * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or
+ * {@link Combine#perKey}.
+ *
+ * @param <T> the type of the values being compared
+ */
public static class MinFn<T> extends BinaryCombineFn<T> {
private final T identity;
@@ -241,6 +248,10 @@ public class Min {
}
}
+ /**
+ * A {@code CombineFn} that computes the minimum of a collection of {@code Integer}s, useful as an
+ * argument to {@link Combine#globally} or {@link Combine#perKey}.
+ */
public static class MinIntegerFn extends Combine.BinaryCombineIntegerFn {
@Override
@@ -254,6 +265,10 @@ public class Min {
}
}
+ /**
+ * A {@code CombineFn} that computes the minimum of a collection of {@code Long}s, useful as an
+ * argument to {@link Combine#globally} or {@link Combine#perKey}.
+ */
public static class MinLongFn extends Combine.BinaryCombineLongFn {
@Override
@@ -267,6 +282,10 @@ public class Min {
}
}
+ /**
+ * A {@code CombineFn} that computes the minimum of a collection of {@code Double}s, useful as an
+ * argument to {@link Combine#globally} or {@link Combine#perKey}.
+ */
public static class MinDoubleFn extends Combine.BinaryCombineDoubleFn {
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/3e16e325/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
index 4e0f680..5044732 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
@@ -140,6 +140,11 @@ public class Sum {
/////////////////////////////////////////////////////////////////////////////
+ /**
+ * A {@code SerializableFunction} that computes the sum of an
+ * {@code Iterable} of {@code Integer}s, useful as an argument to
+ * {@link Combine#globally} or {@link Combine#perKey}.
+ */
public static class SumIntegerFn extends Combine.BinaryCombineIntegerFn {
@Override
@@ -153,6 +158,11 @@ public class Sum {
}
}
+ /**
+ * A {@code SerializableFunction} that computes the sum of an
+ * {@code Iterable} of {@code Long}s, useful as an argument to
+ * {@link Combine#globally} or {@link Combine#perKey}.
+ */
public static class SumLongFn extends Combine.BinaryCombineLongFn {
@Override
@@ -166,6 +176,11 @@ public class Sum {
}
}
+ /**
+ * A {@code SerializableFunction} that computes the sum of an
+ * {@code Iterable} of {@code Double}s, useful as an argument to
+ * {@link Combine#globally} or {@link Combine#perKey}.
+ */
public static class SumDoubleFn extends Combine.BinaryCombineDoubleFn {
@Override
[7/7] beam git commit: [BEAM-1223] Introduced Sum, Min,
Max#ofLongs()/ofDoubles()/ofIntegers()
Posted by lc...@apache.org.
[BEAM-1223] Introduced Sum,Min,Max#ofLongs()/ofDoubles()/ofIntegers()
This closes #1723
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d86db15b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d86db15b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d86db15b
Branch: refs/heads/master
Commit: d86db15ba22cbd99093327dc4962e06fa2d5db43
Parents: e794f14 3e16e32
Author: Luke Cwik <lc...@google.com>
Authored: Tue Jan 3 14:36:24 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 14:36:24 2017 -0800
----------------------------------------------------------------------
.../beam/examples/DebuggingWordCount.java | 4 +-
.../org/apache/beam/examples/WordCount.java | 2 +-
.../cookbook/CombinePerKeyExamples.java | 2 +-
.../beam/examples/complete/game/GameStats.java | 2 +-
.../beam/examples/complete/game/UserScore.java | 2 +-
.../runners/apex/examples/WordCountTest.java | 2 +-
.../utils/ApexStateInternalsTest.java | 2 +-
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 4 +-
.../runners/core/GroupAlsoByWindowsDoFn.java | 4 +-
.../apache/beam/runners/core/NonEmptyPanes.java | 2 +-
.../AfterDelayFromFirstElementStateMachine.java | 2 +-
.../core/triggers/AfterPaneStateMachine.java | 2 +-
.../core/LateDataDroppingDoFnRunnerTest.java | 2 +-
.../beam/runners/core/ReduceFnRunnerTest.java | 12 +--
.../beam/runners/core/ReduceFnTester.java | 2 +-
.../GroupAlsoByWindowEvaluatorFactory.java | 4 +-
.../runners/direct/AggregatorContainerTest.java | 16 ++--
.../CopyOnAccessInMemoryStateInternalsTest.java | 4 +-
.../runners/direct/EvaluationContextTest.java | 6 +-
.../beam/runners/flink/examples/WordCount.java | 2 +-
.../flink/examples/streaming/AutoComplete.java | 2 +-
.../KafkaWindowedWordCountExample.java | 2 +-
.../examples/streaming/WindowedWordCount.java | 2 +-
.../streaming/FlinkStateInternalsTest.java | 2 +-
.../dataflow/DataflowPipelineJobTest.java | 8 +-
.../spark/aggregators/NamedAggregators.java | 4 +-
.../beam/runners/spark/examples/WordCount.java | 2 +-
.../translation/SparkGroupAlsoByWindowFn.java | 2 +-
.../spark/translation/SparkRuntimeContext.java | 63 +++++----------
.../ResumeFromCheckpointStreamingTest.java | 2 +-
.../streaming/utils/PAssertStreaming.java | 4 +-
.../apache/beam/sdk/io/PubsubUnboundedSink.java | 8 +-
.../beam/sdk/io/PubsubUnboundedSource.java | 5 +-
.../org/apache/beam/sdk/testing/PAssert.java | 12 +--
.../org/apache/beam/sdk/transforms/Max.java | 85 +++++++++++++++-----
.../org/apache/beam/sdk/transforms/Mean.java | 21 ++++-
.../org/apache/beam/sdk/transforms/Min.java | 83 ++++++++++++++-----
.../org/apache/beam/sdk/transforms/Sum.java | 44 ++++++++--
.../windowing/AfterDelayFromFirstElement.java | 2 +-
.../sdk/transforms/windowing/AfterPane.java | 2 +-
.../sdk/AggregatorPipelineExtractorTest.java | 16 ++--
.../beam/sdk/transforms/CombineFnsTest.java | 20 +++--
.../apache/beam/sdk/transforms/DoFnTest.java | 15 ++--
.../beam/sdk/transforms/DoFnTesterTest.java | 6 +-
.../org/apache/beam/sdk/transforms/MaxTest.java | 6 +-
.../apache/beam/sdk/transforms/MeanTest.java | 2 +-
.../org/apache/beam/sdk/transforms/MinTest.java | 6 +-
.../beam/sdk/transforms/OldDoFnContextTest.java | 2 +-
.../apache/beam/sdk/transforms/OldDoFnTest.java | 11 ++-
.../beam/sdk/transforms/SimpleStatsFnsTest.java | 36 ++++-----
.../org/apache/beam/sdk/transforms/SumTest.java | 12 +--
.../apache/beam/sdk/transforms/ViewTest.java | 2 +-
.../apache/beam/sdk/util/CombineFnUtilTest.java | 8 +-
.../util/state/InMemoryStateInternalsTest.java | 2 +-
.../beam/sdk/util/state/StateTagTest.java | 11 ++-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 2 +-
56 files changed, 341 insertions(+), 249 deletions(-)
----------------------------------------------------------------------