You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/02 18:56:20 UTC

[2/6] beam git commit: Simplify type parameters of StateSpec and related

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 52b2f5e..26904aa 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -1625,7 +1625,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, Integer>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<Integer>> intState =
+          private final StateSpec<ValueState<Integer>> intState =
               StateSpecs.value(VarIntCoder.of());
 
           @ProcessElement
@@ -1654,7 +1654,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<Integer, Integer>, Integer>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<Integer>> seenSpec =
+          private final StateSpec<ValueState<Integer>> seenSpec =
               StateSpecs.value(VarIntCoder.of());
 
           @ProcessElement
@@ -1704,7 +1704,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, MyInteger>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<MyInteger>> intState =
+          private final StateSpec<ValueState<MyInteger>> intState =
               StateSpecs.value();
 
           @ProcessElement
@@ -1734,7 +1734,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, MyInteger>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<MyInteger>> intState =
+          private final StateSpec<ValueState<MyInteger>> intState =
               StateSpecs.value();
 
           @ProcessElement
@@ -1765,7 +1765,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, MyInteger>, MyInteger>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<MyInteger>> intState =
+          private final StateSpec<ValueState<MyInteger>> intState =
               StateSpecs.value();
 
           @ProcessElement
@@ -1797,7 +1797,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, List<MyInteger>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<List<MyInteger>>> intState =
+          private final StateSpec<ValueState<List<MyInteger>>> intState =
               StateSpecs.value();
 
           @ProcessElement
@@ -1828,7 +1828,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, Integer>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<Integer>> intState =
+          private final StateSpec<ValueState<Integer>> intState =
               StateSpecs.value(VarIntCoder.of());
 
           @ProcessElement
@@ -1876,7 +1876,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, KV<String, Integer>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<Integer>> intState =
+          private final StateSpec<ValueState<Integer>> intState =
               StateSpecs.value(VarIntCoder.of());
 
           @ProcessElement
@@ -1892,7 +1892,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, Integer>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<Integer>> intState =
+          private final StateSpec<ValueState<Integer>> intState =
               StateSpecs.value(VarIntCoder.of());
 
           @ProcessElement
@@ -1929,7 +1929,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, Integer>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<Integer>> intState =
+          private final StateSpec<ValueState<Integer>> intState =
               StateSpecs.value(VarIntCoder.of());
 
           @ProcessElement
@@ -1976,7 +1976,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, List<Integer>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, BagState<Integer>> bufferState =
+          private final StateSpec<BagState<Integer>> bufferState =
               StateSpecs.bag(VarIntCoder.of());
 
           @ProcessElement
@@ -2013,7 +2013,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, List<MyInteger>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, BagState<MyInteger>> bufferState =
+          private final StateSpec<BagState<MyInteger>> bufferState =
               StateSpecs.bag();
 
           @ProcessElement
@@ -2051,7 +2051,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, List<MyInteger>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, BagState<MyInteger>> bufferState =
+          private final StateSpec<BagState<MyInteger>> bufferState =
               StateSpecs.bag();
 
           @ProcessElement
@@ -2088,10 +2088,10 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, Set<Integer>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, SetState<Integer>> setState =
+          private final StateSpec<SetState<Integer>> setState =
               StateSpecs.set(VarIntCoder.of());
           @StateId(countStateId)
-          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+          private final StateSpec<CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
@@ -2132,10 +2132,10 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, Set<MyInteger>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set();
+          private final StateSpec<SetState<MyInteger>> setState = StateSpecs.set();
 
           @StateId(countStateId)
-          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+          private final StateSpec<CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
@@ -2175,10 +2175,10 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, Set<MyInteger>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set();
+          private final StateSpec<SetState<MyInteger>> setState = StateSpecs.set();
 
           @StateId(countStateId)
-          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+          private final StateSpec<CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
@@ -2217,10 +2217,10 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, KV<String, Integer>>, KV<String, Integer>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, MapState<String, Integer>> mapState =
+          private final StateSpec<MapState<String, Integer>> mapState =
               StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of());
           @StateId(countStateId)
-          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+          private final StateSpec<CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
@@ -2264,9 +2264,10 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
+          private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map();
+
           @StateId(countStateId)
-          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+          private final StateSpec<CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
@@ -2310,9 +2311,10 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
+          private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map();
+
           @StateId(countStateId)
-          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+          private final StateSpec<CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
@@ -2356,16 +2358,13 @@ public class ParDoTest implements Serializable {
           private static final double EPSILON = 0.0001;
 
           @StateId(stateId)
-          private final StateSpec<
-                  Object, CombiningState<Double, CountSum<Double>, Double>>
-              combiningState =
-                  StateSpecs.combining(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
+          private final StateSpec<CombiningState<Double, CountSum<Double>, Double>> combiningState =
+              StateSpecs.combining(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
 
           @ProcessElement
           public void processElement(
               ProcessContext c,
-              @StateId(stateId)
-                  CombiningState<Double, CountSum<Double>, Double> state) {
+              @StateId(stateId) CombiningState<Double, CountSum<Double>, Double> state) {
             state.add(c.element().getValue());
             Double currentValue = state.read();
             if (Math.abs(currentValue - 0.5) < EPSILON) {
@@ -2396,40 +2395,38 @@ public class ParDoTest implements Serializable {
           private static final int EXPECTED_SUM = 16;
 
           @StateId(stateId)
-          private final StateSpec<
-              Object, CombiningState<Integer, MyInteger, Integer>>
-              combiningState =
-              StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() {
-                @Override
-                public MyInteger createAccumulator() {
-                  return new MyInteger(0);
-                }
-
-                @Override
-                public MyInteger addInput(MyInteger accumulator, Integer input) {
-                  return new MyInteger(accumulator.getValue() + input);
-                }
-
-                @Override
-                public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) {
-                  int newValue = 0;
-                  for (MyInteger myInteger : accumulators) {
-                    newValue += myInteger.getValue();
-                  }
-                  return new MyInteger(newValue);
-                }
+          private final StateSpec<CombiningState<Integer, MyInteger, Integer>> combiningState =
+              StateSpecs.combining(
+                  new Combine.CombineFn<Integer, MyInteger, Integer>() {
+                    @Override
+                    public MyInteger createAccumulator() {
+                      return new MyInteger(0);
+                    }
+
+                    @Override
+                    public MyInteger addInput(MyInteger accumulator, Integer input) {
+                      return new MyInteger(accumulator.getValue() + input);
+                    }
+
+                    @Override
+                    public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) {
+                      int newValue = 0;
+                      for (MyInteger myInteger : accumulators) {
+                        newValue += myInteger.getValue();
+                      }
+                      return new MyInteger(newValue);
+                    }
 
-                @Override
-                public Integer extractOutput(MyInteger accumulator) {
-                  return accumulator.getValue();
-                }
-              });
+                    @Override
+                    public Integer extractOutput(MyInteger accumulator) {
+                      return accumulator.getValue();
+                    }
+                  });
 
           @ProcessElement
           public void processElement(
               ProcessContext c,
-              @StateId(stateId)
-                  CombiningState<Integer, MyInteger, Integer> state) {
+              @StateId(stateId) CombiningState<Integer, MyInteger, Integer> state) {
             state.add(c.element().getValue());
             Integer currentValue = state.read();
             if (currentValue == EXPECTED_SUM) {
@@ -2458,40 +2455,38 @@ public class ParDoTest implements Serializable {
           private static final int EXPECTED_SUM = 16;
 
           @StateId(stateId)
-          private final StateSpec<
-              Object, CombiningState<Integer, MyInteger, Integer>>
-              combiningState =
-              StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() {
-                @Override
-                public MyInteger createAccumulator() {
-                  return new MyInteger(0);
-                }
-
-                @Override
-                public MyInteger addInput(MyInteger accumulator, Integer input) {
-                  return new MyInteger(accumulator.getValue() + input);
-                }
-
-                @Override
-                public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) {
-                  int newValue = 0;
-                  for (MyInteger myInteger : accumulators) {
-                    newValue += myInteger.getValue();
-                  }
-                  return new MyInteger(newValue);
-                }
+          private final StateSpec<CombiningState<Integer, MyInteger, Integer>> combiningState =
+              StateSpecs.combining(
+                  new Combine.CombineFn<Integer, MyInteger, Integer>() {
+                    @Override
+                    public MyInteger createAccumulator() {
+                      return new MyInteger(0);
+                    }
+
+                    @Override
+                    public MyInteger addInput(MyInteger accumulator, Integer input) {
+                      return new MyInteger(accumulator.getValue() + input);
+                    }
+
+                    @Override
+                    public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) {
+                      int newValue = 0;
+                      for (MyInteger myInteger : accumulators) {
+                        newValue += myInteger.getValue();
+                      }
+                      return new MyInteger(newValue);
+                    }
 
-                @Override
-                public Integer extractOutput(MyInteger accumulator) {
-                  return accumulator.getValue();
-                }
-              });
+                    @Override
+                    public Integer extractOutput(MyInteger accumulator) {
+                      return accumulator.getValue();
+                    }
+                  });
 
           @ProcessElement
           public void processElement(
               ProcessContext c,
-              @StateId(stateId)
-                  CombiningState<Integer, MyInteger, Integer> state) {
+              @StateId(stateId) CombiningState<Integer, MyInteger, Integer> state) {
             state.add(c.element().getValue());
             Integer currentValue = state.read();
             if (currentValue == EXPECTED_SUM) {
@@ -2523,7 +2518,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, List<Integer>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, BagState<Integer>> bufferState =
+          private final StateSpec<BagState<Integer>> bufferState =
               StateSpecs.bag(VarIntCoder.of());
 
           @ProcessElement
@@ -2697,7 +2692,7 @@ public class ParDoTest implements Serializable {
           private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<String>> stateSpec =
+          private final StateSpec<ValueState<String>> stateSpec =
               StateSpecs.value(StringUtf8Coder.of());
 
           @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 5732438..c16eea2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -188,7 +188,7 @@ public class DoFnInvokersTest {
 
     class MockFn extends DoFn<String, String> {
       @StateId(stateId)
-      private final StateSpec<Object, ValueState<Integer>> spec =
+      private final StateSpec<ValueState<Integer>> spec =
           StateSpecs.value(VarIntCoder.of());
 
       @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index e1fa2d1..d6cc4f6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -542,11 +542,11 @@ public class DoFnSignaturesTest {
         DoFnSignatures.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
               @StateId("my-id")
-              private final StateSpec<Object, ValueState<Integer>> myfield1 =
+              private final StateSpec<ValueState<Integer>> myfield1 =
                   StateSpecs.value(VarIntCoder.of());
 
               @StateId("my-id")
-              private final StateSpec<Object, ValueState<Long>> myfield2 =
+              private final StateSpec<ValueState<Long>> myfield2 =
                   StateSpecs.value(VarLongCoder.of());
 
               @ProcessElement
@@ -565,7 +565,7 @@ public class DoFnSignaturesTest {
         DoFnSignatures.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
               @StateId("my-id")
-              private StateSpec<Object, ValueState<Integer>> myfield =
+              private StateSpec<ValueState<Integer>> myfield =
                   StateSpecs.value(VarIntCoder.of());
 
               @ProcessElement
@@ -618,7 +618,7 @@ public class DoFnSignaturesTest {
         DoFnSignatures.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
               @StateId("my-id")
-              private final StateSpec<Object, ValueState<Integer>> myfield =
+              private final StateSpec<ValueState<Integer>> myfield =
                   StateSpecs.value(VarIntCoder.of());
 
               @ProcessElement
@@ -644,7 +644,7 @@ public class DoFnSignaturesTest {
         DoFnSignatures.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
               @StateId("my-id")
-              private final StateSpec<Object, ValueState<Integer>> myfield =
+              private final StateSpec<ValueState<Integer>> myfield =
                   StateSpecs.value(VarIntCoder.of());
 
               @ProcessElement
@@ -668,7 +668,7 @@ public class DoFnSignaturesTest {
         DoFnSignatures.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
               @StateId("my-id")
-              private final StateSpec<Object, ValueState<Integer>> myfield =
+              private final StateSpec<ValueState<Integer>> myfield =
                   StateSpecs.value(VarIntCoder.of());
 
               @ProcessElement
@@ -683,7 +683,7 @@ public class DoFnSignaturesTest {
         DoFnSignatures.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
               @StateId("foo")
-              private final StateSpec<Object, ValueState<Integer>> bizzle =
+              private final StateSpec<ValueState<Integer>> bizzle =
                   StateSpecs.value(VarIntCoder.of());
 
               @ProcessElement
@@ -728,7 +728,7 @@ public class DoFnSignaturesTest {
         DoFnSignatures.getSignature(
             new DoFnUsingState() {
               @StateId(DoFnUsingState.STATE_ID)
-              private final StateSpec<Object, ValueState<Integer>> spec =
+              private final StateSpec<ValueState<Integer>> spec =
                   StateSpecs.value(VarIntCoder.of());
             }.getClass());
   }
@@ -770,7 +770,7 @@ public class DoFnSignaturesTest {
         DoFnSignatures.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
               @StateId("foo")
-              private final StateSpec<Object, ValueState<Integer>> bizzleDecl =
+              private final StateSpec<ValueState<Integer>> bizzleDecl =
                   StateSpecs.value(VarIntCoder.of());
 
               @ProcessElement
@@ -803,7 +803,7 @@ public class DoFnSignaturesTest {
   public void testSimpleStateIdNamedDoFn() throws Exception {
     class DoFnForTestSimpleStateIdNamedDoFn extends DoFn<KV<String, Integer>, Long> {
       @StateId("foo")
-      private final StateSpec<Object, ValueState<Integer>> bizzle =
+      private final StateSpec<ValueState<Integer>> bizzle =
           StateSpecs.value(VarIntCoder.of());
 
       @ProcessElement
@@ -831,7 +831,7 @@ public class DoFnSignaturesTest {
       // Note that in order to have a coder for T it will require initialization in the constructor,
       // but that isn't important for this test
       @StateId("foo")
-      private final StateSpec<Object, ValueState<T>> bizzle = null;
+      private final StateSpec<ValueState<T>> bizzle = null;
 
       @ProcessElement
       public void foo(ProcessContext context) {}
@@ -866,7 +866,7 @@ public class DoFnSignaturesTest {
     public static final String STATE_ID = "my-state-id";
 
     @StateId(STATE_ID)
-    private final StateSpec<Object, ValueState<Integer>> bizzle =
+    private final StateSpec<ValueState<Integer>> bizzle =
         StateSpecs.value(VarIntCoder.of());
   }
 
@@ -882,7 +882,7 @@ public class DoFnSignaturesTest {
     public static final String STATE_ID = "my-state-id";
 
     @StateId(STATE_ID)
-    private final StateSpec<Object, ValueState<String>> myStateSpec =
+    private final StateSpec<ValueState<String>> myStateSpec =
         StateSpecs.value(StringUtf8Coder.of());
 
     @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
index 9714d72..9b79d11 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
@@ -59,7 +59,7 @@ public class FakeStepContext implements StepContext {
   }
 
   @Override
-  public StateInternals<?> stateInternals() {
+  public StateInternals stateInternals() {
     throw new UnsupportedOperationException();
   }