You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/02 18:56:20 UTC
[2/6] beam git commit: Simplify type parameters of StateSpec and
related
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 52b2f5e..26904aa 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -1625,7 +1625,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, Integer>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<Integer>> intState =
+ private final StateSpec<ValueState<Integer>> intState =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -1654,7 +1654,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<Integer, Integer>, Integer>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<Integer>> seenSpec =
+ private final StateSpec<ValueState<Integer>> seenSpec =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -1704,7 +1704,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, MyInteger>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<MyInteger>> intState =
+ private final StateSpec<ValueState<MyInteger>> intState =
StateSpecs.value();
@ProcessElement
@@ -1734,7 +1734,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, MyInteger>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<MyInteger>> intState =
+ private final StateSpec<ValueState<MyInteger>> intState =
StateSpecs.value();
@ProcessElement
@@ -1765,7 +1765,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, MyInteger>, MyInteger>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<MyInteger>> intState =
+ private final StateSpec<ValueState<MyInteger>> intState =
StateSpecs.value();
@ProcessElement
@@ -1797,7 +1797,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, List<MyInteger>>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<List<MyInteger>>> intState =
+ private final StateSpec<ValueState<List<MyInteger>>> intState =
StateSpecs.value();
@ProcessElement
@@ -1828,7 +1828,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, Integer>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<Integer>> intState =
+ private final StateSpec<ValueState<Integer>> intState =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -1876,7 +1876,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, KV<String, Integer>>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<Integer>> intState =
+ private final StateSpec<ValueState<Integer>> intState =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -1892,7 +1892,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, Integer>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<Integer>> intState =
+ private final StateSpec<ValueState<Integer>> intState =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -1929,7 +1929,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, Integer>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<Integer>> intState =
+ private final StateSpec<ValueState<Integer>> intState =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -1976,7 +1976,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, List<Integer>>() {
@StateId(stateId)
- private final StateSpec<Object, BagState<Integer>> bufferState =
+ private final StateSpec<BagState<Integer>> bufferState =
StateSpecs.bag(VarIntCoder.of());
@ProcessElement
@@ -2013,7 +2013,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, List<MyInteger>>() {
@StateId(stateId)
- private final StateSpec<Object, BagState<MyInteger>> bufferState =
+ private final StateSpec<BagState<MyInteger>> bufferState =
StateSpecs.bag();
@ProcessElement
@@ -2051,7 +2051,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, List<MyInteger>>() {
@StateId(stateId)
- private final StateSpec<Object, BagState<MyInteger>> bufferState =
+ private final StateSpec<BagState<MyInteger>> bufferState =
StateSpecs.bag();
@ProcessElement
@@ -2088,10 +2088,10 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, Set<Integer>>() {
@StateId(stateId)
- private final StateSpec<Object, SetState<Integer>> setState =
+ private final StateSpec<SetState<Integer>> setState =
StateSpecs.set(VarIntCoder.of());
@StateId(countStateId)
- private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+ private final StateSpec<CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@@ -2132,10 +2132,10 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, Set<MyInteger>>() {
@StateId(stateId)
- private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set();
+ private final StateSpec<SetState<MyInteger>> setState = StateSpecs.set();
@StateId(countStateId)
- private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+ private final StateSpec<CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@@ -2175,10 +2175,10 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, Set<MyInteger>>() {
@StateId(stateId)
- private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set();
+ private final StateSpec<SetState<MyInteger>> setState = StateSpecs.set();
@StateId(countStateId)
- private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+ private final StateSpec<CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@@ -2217,10 +2217,10 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, KV<String, Integer>>, KV<String, Integer>>() {
@StateId(stateId)
- private final StateSpec<Object, MapState<String, Integer>> mapState =
+ private final StateSpec<MapState<String, Integer>> mapState =
StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of());
@StateId(countStateId)
- private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+ private final StateSpec<CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@@ -2264,9 +2264,10 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() {
@StateId(stateId)
- private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
+ private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map();
+
@StateId(countStateId)
- private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+ private final StateSpec<CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@@ -2310,9 +2311,10 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() {
@StateId(stateId)
- private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
+ private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map();
+
@StateId(countStateId)
- private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+ private final StateSpec<CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@@ -2356,16 +2358,13 @@ public class ParDoTest implements Serializable {
private static final double EPSILON = 0.0001;
@StateId(stateId)
- private final StateSpec<
- Object, CombiningState<Double, CountSum<Double>, Double>>
- combiningState =
- StateSpecs.combining(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
+ private final StateSpec<CombiningState<Double, CountSum<Double>, Double>> combiningState =
+ StateSpecs.combining(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
@ProcessElement
public void processElement(
ProcessContext c,
- @StateId(stateId)
- CombiningState<Double, CountSum<Double>, Double> state) {
+ @StateId(stateId) CombiningState<Double, CountSum<Double>, Double> state) {
state.add(c.element().getValue());
Double currentValue = state.read();
if (Math.abs(currentValue - 0.5) < EPSILON) {
@@ -2396,40 +2395,38 @@ public class ParDoTest implements Serializable {
private static final int EXPECTED_SUM = 16;
@StateId(stateId)
- private final StateSpec<
- Object, CombiningState<Integer, MyInteger, Integer>>
- combiningState =
- StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() {
- @Override
- public MyInteger createAccumulator() {
- return new MyInteger(0);
- }
-
- @Override
- public MyInteger addInput(MyInteger accumulator, Integer input) {
- return new MyInteger(accumulator.getValue() + input);
- }
-
- @Override
- public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) {
- int newValue = 0;
- for (MyInteger myInteger : accumulators) {
- newValue += myInteger.getValue();
- }
- return new MyInteger(newValue);
- }
+ private final StateSpec<CombiningState<Integer, MyInteger, Integer>> combiningState =
+ StateSpecs.combining(
+ new Combine.CombineFn<Integer, MyInteger, Integer>() {
+ @Override
+ public MyInteger createAccumulator() {
+ return new MyInteger(0);
+ }
+
+ @Override
+ public MyInteger addInput(MyInteger accumulator, Integer input) {
+ return new MyInteger(accumulator.getValue() + input);
+ }
+
+ @Override
+ public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) {
+ int newValue = 0;
+ for (MyInteger myInteger : accumulators) {
+ newValue += myInteger.getValue();
+ }
+ return new MyInteger(newValue);
+ }
- @Override
- public Integer extractOutput(MyInteger accumulator) {
- return accumulator.getValue();
- }
- });
+ @Override
+ public Integer extractOutput(MyInteger accumulator) {
+ return accumulator.getValue();
+ }
+ });
@ProcessElement
public void processElement(
ProcessContext c,
- @StateId(stateId)
- CombiningState<Integer, MyInteger, Integer> state) {
+ @StateId(stateId) CombiningState<Integer, MyInteger, Integer> state) {
state.add(c.element().getValue());
Integer currentValue = state.read();
if (currentValue == EXPECTED_SUM) {
@@ -2458,40 +2455,38 @@ public class ParDoTest implements Serializable {
private static final int EXPECTED_SUM = 16;
@StateId(stateId)
- private final StateSpec<
- Object, CombiningState<Integer, MyInteger, Integer>>
- combiningState =
- StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() {
- @Override
- public MyInteger createAccumulator() {
- return new MyInteger(0);
- }
-
- @Override
- public MyInteger addInput(MyInteger accumulator, Integer input) {
- return new MyInteger(accumulator.getValue() + input);
- }
-
- @Override
- public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) {
- int newValue = 0;
- for (MyInteger myInteger : accumulators) {
- newValue += myInteger.getValue();
- }
- return new MyInteger(newValue);
- }
+ private final StateSpec<CombiningState<Integer, MyInteger, Integer>> combiningState =
+ StateSpecs.combining(
+ new Combine.CombineFn<Integer, MyInteger, Integer>() {
+ @Override
+ public MyInteger createAccumulator() {
+ return new MyInteger(0);
+ }
+
+ @Override
+ public MyInteger addInput(MyInteger accumulator, Integer input) {
+ return new MyInteger(accumulator.getValue() + input);
+ }
+
+ @Override
+ public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) {
+ int newValue = 0;
+ for (MyInteger myInteger : accumulators) {
+ newValue += myInteger.getValue();
+ }
+ return new MyInteger(newValue);
+ }
- @Override
- public Integer extractOutput(MyInteger accumulator) {
- return accumulator.getValue();
- }
- });
+ @Override
+ public Integer extractOutput(MyInteger accumulator) {
+ return accumulator.getValue();
+ }
+ });
@ProcessElement
public void processElement(
ProcessContext c,
- @StateId(stateId)
- CombiningState<Integer, MyInteger, Integer> state) {
+ @StateId(stateId) CombiningState<Integer, MyInteger, Integer> state) {
state.add(c.element().getValue());
Integer currentValue = state.read();
if (currentValue == EXPECTED_SUM) {
@@ -2523,7 +2518,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, List<Integer>>() {
@StateId(stateId)
- private final StateSpec<Object, BagState<Integer>> bufferState =
+ private final StateSpec<BagState<Integer>> bufferState =
StateSpecs.bag(VarIntCoder.of());
@ProcessElement
@@ -2697,7 +2692,7 @@ public class ParDoTest implements Serializable {
private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StateId(stateId)
- private final StateSpec<Object, ValueState<String>> stateSpec =
+ private final StateSpec<ValueState<String>> stateSpec =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 5732438..c16eea2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -188,7 +188,7 @@ public class DoFnInvokersTest {
class MockFn extends DoFn<String, String> {
@StateId(stateId)
- private final StateSpec<Object, ValueState<Integer>> spec =
+ private final StateSpec<ValueState<Integer>> spec =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index e1fa2d1..d6cc4f6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -542,11 +542,11 @@ public class DoFnSignaturesTest {
DoFnSignatures.getSignature(
new DoFn<KV<String, Integer>, Long>() {
@StateId("my-id")
- private final StateSpec<Object, ValueState<Integer>> myfield1 =
+ private final StateSpec<ValueState<Integer>> myfield1 =
StateSpecs.value(VarIntCoder.of());
@StateId("my-id")
- private final StateSpec<Object, ValueState<Long>> myfield2 =
+ private final StateSpec<ValueState<Long>> myfield2 =
StateSpecs.value(VarLongCoder.of());
@ProcessElement
@@ -565,7 +565,7 @@ public class DoFnSignaturesTest {
DoFnSignatures.getSignature(
new DoFn<KV<String, Integer>, Long>() {
@StateId("my-id")
- private StateSpec<Object, ValueState<Integer>> myfield =
+ private StateSpec<ValueState<Integer>> myfield =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -618,7 +618,7 @@ public class DoFnSignaturesTest {
DoFnSignatures.getSignature(
new DoFn<KV<String, Integer>, Long>() {
@StateId("my-id")
- private final StateSpec<Object, ValueState<Integer>> myfield =
+ private final StateSpec<ValueState<Integer>> myfield =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -644,7 +644,7 @@ public class DoFnSignaturesTest {
DoFnSignatures.getSignature(
new DoFn<KV<String, Integer>, Long>() {
@StateId("my-id")
- private final StateSpec<Object, ValueState<Integer>> myfield =
+ private final StateSpec<ValueState<Integer>> myfield =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -668,7 +668,7 @@ public class DoFnSignaturesTest {
DoFnSignatures.getSignature(
new DoFn<KV<String, Integer>, Long>() {
@StateId("my-id")
- private final StateSpec<Object, ValueState<Integer>> myfield =
+ private final StateSpec<ValueState<Integer>> myfield =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -683,7 +683,7 @@ public class DoFnSignaturesTest {
DoFnSignatures.getSignature(
new DoFn<KV<String, Integer>, Long>() {
@StateId("foo")
- private final StateSpec<Object, ValueState<Integer>> bizzle =
+ private final StateSpec<ValueState<Integer>> bizzle =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -728,7 +728,7 @@ public class DoFnSignaturesTest {
DoFnSignatures.getSignature(
new DoFnUsingState() {
@StateId(DoFnUsingState.STATE_ID)
- private final StateSpec<Object, ValueState<Integer>> spec =
+ private final StateSpec<ValueState<Integer>> spec =
StateSpecs.value(VarIntCoder.of());
}.getClass());
}
@@ -770,7 +770,7 @@ public class DoFnSignaturesTest {
DoFnSignatures.getSignature(
new DoFn<KV<String, Integer>, Long>() {
@StateId("foo")
- private final StateSpec<Object, ValueState<Integer>> bizzleDecl =
+ private final StateSpec<ValueState<Integer>> bizzleDecl =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -803,7 +803,7 @@ public class DoFnSignaturesTest {
public void testSimpleStateIdNamedDoFn() throws Exception {
class DoFnForTestSimpleStateIdNamedDoFn extends DoFn<KV<String, Integer>, Long> {
@StateId("foo")
- private final StateSpec<Object, ValueState<Integer>> bizzle =
+ private final StateSpec<ValueState<Integer>> bizzle =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -831,7 +831,7 @@ public class DoFnSignaturesTest {
// Note that in order to have a coder for T it will require initialization in the constructor,
// but that isn't important for this test
@StateId("foo")
- private final StateSpec<Object, ValueState<T>> bizzle = null;
+ private final StateSpec<ValueState<T>> bizzle = null;
@ProcessElement
public void foo(ProcessContext context) {}
@@ -866,7 +866,7 @@ public class DoFnSignaturesTest {
public static final String STATE_ID = "my-state-id";
@StateId(STATE_ID)
- private final StateSpec<Object, ValueState<Integer>> bizzle =
+ private final StateSpec<ValueState<Integer>> bizzle =
StateSpecs.value(VarIntCoder.of());
}
@@ -882,7 +882,7 @@ public class DoFnSignaturesTest {
public static final String STATE_ID = "my-state-id";
@StateId(STATE_ID)
- private final StateSpec<Object, ValueState<String>> myStateSpec =
+ private final StateSpec<ValueState<String>> myStateSpec =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
index 9714d72..9b79d11 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
@@ -59,7 +59,7 @@ public class FakeStepContext implements StepContext {
}
@Override
- public StateInternals<?> stateInternals() {
+ public StateInternals stateInternals() {
throw new UnsupportedOperationException();
}