You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/02 18:56:22 UTC
[4/6] beam git commit: Simplify type parameters of StateSpec and
related
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
index 315110d..a056937 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
@@ -50,11 +50,11 @@ import org.joda.time.Instant;
public class TriggerStateMachineContextFactory<W extends BoundedWindow> {
private final WindowFn<?, W> windowFn;
- private StateInternals<?> stateInternals;
+ private StateInternals stateInternals;
private final Coder<W> windowCoder;
- public TriggerStateMachineContextFactory(WindowFn<?, W> windowFn,
- StateInternals<?> stateInternals, ActiveWindowSet<W> activeWindows) {
+ public TriggerStateMachineContextFactory(
+ WindowFn<?, W> windowFn, StateInternals stateInternals, ActiveWindowSet<W> activeWindows) {
// Future triggers may be able to exploit the active window to state address window mapping.
this.windowFn = windowFn;
this.stateInternals = stateInternals;
@@ -263,7 +263,7 @@ public class TriggerStateMachineContextFactory<W extends BoundedWindow> {
}
@Override
- public <StateT extends State> StateT access(StateTag<? super Object, StateT> address) {
+ public <StateT extends State> StateT access(StateTag<StateT> address) {
return stateInternals.state(windowNamespace, address);
}
}
@@ -280,13 +280,13 @@ public class TriggerStateMachineContextFactory<W extends BoundedWindow> {
@Override
public <StateT extends State> StateT access(
- StateTag<? super Object, StateT> address) {
+ StateTag<StateT> address) {
return stateInternals.state(windowNamespace, address);
}
@Override
public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
- StateTag<? super Object, StateT> address) {
+ StateTag<StateT> address) {
ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
for (W mergingWindow : activeToBeMerged) {
StateT stateForWindow = stateInternals.state(namespaceFor(mergingWindow), address);
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
index e26241a..fc2f696 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
@@ -57,7 +57,7 @@ import org.joda.time.Instant;
*/
public class TriggerStateMachineRunner<W extends BoundedWindow> {
@VisibleForTesting
- public static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
+ public static final StateTag<ValueState<BitSet>> FINISHED_BITS_TAG =
StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
private final ExecutableTriggerStateMachine rootTrigger;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index bc33366..054a2e2 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -570,7 +570,7 @@ public class GroupAlsoByWindowsProperties {
}
private static final class CachingStateInternalsFactory<K> implements StateInternalsFactory<K> {
- private final LoadingCache<K, StateInternals<K>> stateInternalsCache;
+ private final LoadingCache<K, StateInternals> stateInternalsCache;
private CachingStateInternalsFactory() {
this.stateInternalsCache = CacheBuilder.newBuilder().build(new StateInternalsLoader<K>());
@@ -578,7 +578,7 @@ public class GroupAlsoByWindowsProperties {
@Override
@SuppressWarnings("unchecked")
- public StateInternals<K> stateInternalsForKey(K key) {
+ public StateInternals stateInternalsForKey(K key) {
try {
return stateInternalsCache.get(key);
} catch (Exception exc) {
@@ -587,9 +587,9 @@ public class GroupAlsoByWindowsProperties {
}
}
- private static class StateInternalsLoader<K> extends CacheLoader<K, StateInternals<K>> {
+ private static class StateInternalsLoader<K> extends CacheLoader<K, StateInternals> {
@Override
- public StateInternals<K> load(K key) throws Exception {
+ public StateInternals load(K key) throws Exception {
return InMemoryStateInternals.forKey(key);
}
}
@@ -686,7 +686,7 @@ public class GroupAlsoByWindowsProperties {
}
@Override
- public StateInternals<?> stateInternals() {
+ public StateInternals stateInternals() {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index 6248401..16f7f26 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -60,22 +60,22 @@ public class InMemoryStateInternalsTest {
private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
- private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+ private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
StateTags.value("stringValue", StringUtf8Coder.of());
- private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
+ private static final StateTag<CombiningState<Integer, int[], Integer>>
SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
- private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+ private static final StateTag<BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
- private static final StateTag<Object, SetState<String>> STRING_SET_ADDR =
+ private static final StateTag<SetState<String>> STRING_SET_ADDR =
StateTags.set("stringSet", StringUtf8Coder.of());
- private static final StateTag<Object, MapState<String, Integer>> STRING_MAP_ADDR =
+ private static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR =
StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of());
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
+ private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+ private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+ private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
InMemoryStateInternals<String> underTest = InMemoryStateInternals.forKey("dummyKey");
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
index 95d6977..7a83a18 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
@@ -45,7 +45,7 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class MergingActiveWindowSetTest {
private Sessions windowFn;
- private StateInternals<String> state;
+ private StateInternals state;
private MergingActiveWindowSet<IntervalWindow> set;
private ActiveWindowSet.MergeCallback<IntervalWindow> callback;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index eba0f67..573855f 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -314,14 +314,14 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
public final void assertHasOnlyGlobalAndFinishedSetsFor(W... expectedWindows) {
assertHasOnlyGlobalAndAllowedTags(
ImmutableSet.copyOf(expectedWindows),
- ImmutableSet.<StateTag<? super String, ?>>of(TriggerStateMachineRunner.FINISHED_BITS_TAG));
+ ImmutableSet.<StateTag<?>>of(TriggerStateMachineRunner.FINISHED_BITS_TAG));
}
@SafeVarargs
public final void assertHasOnlyGlobalAndFinishedSetsAndPaneInfoFor(W... expectedWindows) {
assertHasOnlyGlobalAndAllowedTags(
ImmutableSet.copyOf(expectedWindows),
- ImmutableSet.<StateTag<? super String, ?>>of(
+ ImmutableSet.<StateTag<?>>of(
TriggerStateMachineRunner.FINISHED_BITS_TAG,
PaneInfoTracker.PANE_INFO_TAG,
WatermarkHold.watermarkHoldTagForTimestampCombiner(
@@ -331,14 +331,14 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
public final void assertHasOnlyGlobalState() {
assertHasOnlyGlobalAndAllowedTags(
- Collections.<W>emptySet(), Collections.<StateTag<? super String, ?>>emptySet());
+ Collections.<W>emptySet(), Collections.<StateTag<?>>emptySet());
}
@SafeVarargs
public final void assertHasOnlyGlobalAndPaneInfoFor(W... expectedWindows) {
assertHasOnlyGlobalAndAllowedTags(
ImmutableSet.copyOf(expectedWindows),
- ImmutableSet.<StateTag<? super String, ?>>of(
+ ImmutableSet.<StateTag<?>>of(
PaneInfoTracker.PANE_INFO_TAG,
WatermarkHold.watermarkHoldTagForTimestampCombiner(
objectStrategy.getTimestampCombiner()),
@@ -350,30 +350,30 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
* {@code expectedWindows} and that each of these windows has only tags from {@code allowedTags}.
*/
private void assertHasOnlyGlobalAndAllowedTags(
- Set<W> expectedWindows, Set<StateTag<? super String, ?>> allowedTags) {
+ Set<W> expectedWindows, Set<StateTag<?>> allowedTags) {
Set<StateNamespace> expectedWindowsSet = new HashSet<>();
for (W expectedWindow : expectedWindows) {
expectedWindowsSet.add(windowNamespace(expectedWindow));
}
- Map<StateNamespace, Set<StateTag<? super String, ?>>> actualWindows = new HashMap<>();
+ Map<StateNamespace, Set<StateTag<?>>> actualWindows = new HashMap<>();
for (StateNamespace namespace : stateInternals.getNamespacesInUse()) {
if (namespace instanceof StateNamespaces.GlobalNamespace) {
continue;
} else if (namespace instanceof StateNamespaces.WindowNamespace) {
- Set<StateTag<? super String, ?>> tagsInUse = stateInternals.getTagsInUse(namespace);
+ Set<StateTag<?>> tagsInUse = stateInternals.getTagsInUse(namespace);
if (tagsInUse.isEmpty()) {
continue;
}
actualWindows.put(namespace, tagsInUse);
- Set<StateTag<? super String, ?>> unexpected = Sets.difference(tagsInUse, allowedTags);
+ Set<StateTag<?>> unexpected = Sets.difference(tagsInUse, allowedTags);
if (unexpected.isEmpty()) {
continue;
} else {
fail(namespace + " has unexpected states: " + tagsInUse);
}
} else if (namespace instanceof StateNamespaces.WindowAndTriggerNamespace) {
- Set<StateTag<? super String, ?>> tagsInUse = stateInternals.getTagsInUse(namespace);
+ Set<StateTag<?>> tagsInUse = stateInternals.getTagsInUse(namespace);
assertTrue(namespace + " contains " + tagsInUse, tagsInUse.isEmpty());
} else {
fail("Unrecognized namespace " + namespace);
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 1a44453..a67db6d 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -231,7 +231,7 @@ public class SplittableParDoTest {
processFn.setStateInternalsFactory(
new StateInternalsFactory<String>() {
@Override
- public StateInternals<String> stateInternalsForKey(String key) {
+ public StateInternals stateInternalsForKey(String key) {
return stateInternals;
}
});
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
index 9a8b75c..fc08dcc 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
@@ -41,10 +41,10 @@ import org.junit.runners.JUnit4;
public class StateTagTest {
@Test
public void testValueEquality() {
- StateTag<?, ?> fooVarInt1 = StateTags.value("foo", VarIntCoder.of());
- StateTag<?, ?> fooVarInt2 = StateTags.value("foo", VarIntCoder.of());
- StateTag<?, ?> fooBigEndian = StateTags.value("foo", BigEndianIntegerCoder.of());
- StateTag<?, ?> barVarInt = StateTags.value("bar", VarIntCoder.of());
+ StateTag<?> fooVarInt1 = StateTags.value("foo", VarIntCoder.of());
+ StateTag<?> fooVarInt2 = StateTags.value("foo", VarIntCoder.of());
+ StateTag<?> fooBigEndian = StateTags.value("foo", BigEndianIntegerCoder.of());
+ StateTag<?> barVarInt = StateTags.value("bar", VarIntCoder.of());
assertEquals(fooVarInt1, fooVarInt2);
assertNotEquals(fooVarInt1, fooBigEndian);
@@ -53,10 +53,10 @@ public class StateTagTest {
@Test
public void testBagEquality() {
- StateTag<?, ?> fooVarInt1 = StateTags.bag("foo", VarIntCoder.of());
- StateTag<?, ?> fooVarInt2 = StateTags.bag("foo", VarIntCoder.of());
- StateTag<?, ?> fooBigEndian = StateTags.bag("foo", BigEndianIntegerCoder.of());
- StateTag<?, ?> barVarInt = StateTags.bag("bar", VarIntCoder.of());
+ StateTag<?> fooVarInt1 = StateTags.bag("foo", VarIntCoder.of());
+ StateTag<?> fooVarInt2 = StateTags.bag("foo", VarIntCoder.of());
+ StateTag<?> fooBigEndian = StateTags.bag("foo", BigEndianIntegerCoder.of());
+ StateTag<?> barVarInt = StateTags.bag("bar", VarIntCoder.of());
assertEquals(fooVarInt1, fooVarInt2);
assertNotEquals(fooVarInt1, fooBigEndian);
@@ -65,10 +65,10 @@ public class StateTagTest {
@Test
public void testSetEquality() {
- StateTag<?, ?> fooVarInt1 = StateTags.set("foo", VarIntCoder.of());
- StateTag<?, ?> fooVarInt2 = StateTags.set("foo", VarIntCoder.of());
- StateTag<?, ?> fooBigEndian = StateTags.set("foo", BigEndianIntegerCoder.of());
- StateTag<?, ?> barVarInt = StateTags.set("bar", VarIntCoder.of());
+ StateTag<?> fooVarInt1 = StateTags.set("foo", VarIntCoder.of());
+ StateTag<?> fooVarInt2 = StateTags.set("foo", VarIntCoder.of());
+ StateTag<?> fooBigEndian = StateTags.set("foo", BigEndianIntegerCoder.of());
+ StateTag<?> barVarInt = StateTags.set("bar", VarIntCoder.of());
assertEquals(fooVarInt1, fooVarInt2);
assertNotEquals(fooVarInt1, fooBigEndian);
@@ -77,15 +77,15 @@ public class StateTagTest {
@Test
public void testMapEquality() {
- StateTag<?, ?> fooStringVarInt1 =
+ StateTag<?> fooStringVarInt1 =
StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
- StateTag<?, ?> fooStringVarInt2 =
+ StateTag<?> fooStringVarInt2 =
StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
- StateTag<?, ?> fooStringBigEndian =
+ StateTag<?> fooStringBigEndian =
StateTags.map("foo", StringUtf8Coder.of(), BigEndianIntegerCoder.of());
- StateTag<?, ?> fooVarIntBigEndian =
+ StateTag<?> fooVarIntBigEndian =
StateTags.map("foo", VarIntCoder.of(), BigEndianIntegerCoder.of());
- StateTag<?, ?> barStringVarInt =
+ StateTag<?> barStringVarInt =
StateTags.map("bar", StringUtf8Coder.of(), VarIntCoder.of());
assertEquals(fooStringVarInt1, fooStringVarInt2);
@@ -97,11 +97,11 @@ public class StateTagTest {
@Test
public void testWatermarkBagEquality() {
- StateTag<?, ?> foo1 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
- StateTag<?, ?> foo2 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
- StateTag<?, ?> bar = StateTags.watermarkStateInternal("bar", TimestampCombiner.EARLIEST);
+ StateTag<?> foo1 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ StateTag<?> foo2 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ StateTag<?> bar = StateTags.watermarkStateInternal("bar", TimestampCombiner.EARLIEST);
- StateTag<?, ?> bar2 = StateTags.watermarkStateInternal("bar", TimestampCombiner.LATEST);
+ StateTag<?> bar2 = StateTags.watermarkStateInternal("bar", TimestampCombiner.LATEST);
// Same id, same fn.
assertEquals(foo1, foo2);
@@ -119,12 +119,12 @@ public class StateTagTest {
Coder<Integer> input2 = BigEndianIntegerCoder.of();
Combine.BinaryCombineIntegerFn minFn = Min.ofIntegers();
- StateTag<?, ?> fooCoder1Max1 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
- StateTag<?, ?> fooCoder1Max2 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
- StateTag<?, ?> fooCoder1Min = StateTags.combiningValueFromInputInternal("foo", input1, minFn);
+ StateTag<?> fooCoder1Max1 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
+ StateTag<?> fooCoder1Max2 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
+ StateTag<?> fooCoder1Min = StateTags.combiningValueFromInputInternal("foo", input1, minFn);
- StateTag<?, ?> fooCoder2Max = StateTags.combiningValueFromInputInternal("foo", input2, maxFn);
- StateTag<?, ?> barCoder1Max = StateTags.combiningValueFromInputInternal("bar", input1, maxFn);
+ StateTag<?> fooCoder2Max = StateTags.combiningValueFromInputInternal("foo", input2, maxFn);
+ StateTag<?> barCoder1Max = StateTags.combiningValueFromInputInternal("bar", input1, maxFn);
// Same name, coder and combineFn
assertEquals(fooCoder1Max1, fooCoder1Max2);
@@ -162,16 +162,16 @@ public class StateTagTest {
Coder<int[]> accum1 = maxFn.getAccumulatorCoder(registry, VarIntCoder.of());
Coder<int[]> accum2 = minFn.getAccumulatorCoder(registry, BigEndianIntegerCoder.of());
- StateTag<?, ?> fooCoder1Max1 = StateTags.combiningValueWithContext(
+ StateTag<?> fooCoder1Max1 = StateTags.combiningValueWithContext(
"foo", accum1, CombineFnUtil.toFnWithContext(maxFn));
- StateTag<?, ?> fooCoder1Max2 = StateTags.combiningValueWithContext(
+ StateTag<?> fooCoder1Max2 = StateTags.combiningValueWithContext(
"foo", accum1, CombineFnUtil.toFnWithContext(maxFn));
- StateTag<?, ?> fooCoder1Min = StateTags.combiningValueWithContext(
+ StateTag<?> fooCoder1Min = StateTags.combiningValueWithContext(
"foo", accum1, CombineFnUtil.toFnWithContext(minFn));
- StateTag<?, ?> fooCoder2Max = StateTags.combiningValueWithContext(
+ StateTag<?> fooCoder2Max = StateTags.combiningValueWithContext(
"foo", accum2, CombineFnUtil.toFnWithContext(maxFn));
- StateTag<?, ?> barCoder1Max = StateTags.combiningValueWithContext(
+ StateTag<?> barCoder1Max = StateTags.combiningValueWithContext(
"bar", accum1, CombineFnUtil.toFnWithContext(maxFn));
// Same name, coder and combineFn
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index aeaa63b..f80643a 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -131,7 +131,7 @@ public class StatefulDoFnRunnerTest {
timerInternals.advanceInputWatermark(new Instant(1L));
MyDoFn fn = new MyDoFn();
- StateTag<Object, ValueState<Integer>> stateTag = StateTags.tagForSpec(fn.stateId, fn.intState);
+ StateTag<ValueState<Integer>> stateTag = StateTags.tagForSpec(fn.stateId, fn.intState);
DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner(
fn,
@@ -227,7 +227,7 @@ public class StatefulDoFnRunnerTest {
public final String stateId = "foo";
@StateId(stateId)
- public final StateSpec<Object, ValueState<Integer>> intState =
+ public final StateSpec<ValueState<Integer>> intState =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index 92d87b5..ef3a053 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -60,24 +60,25 @@ import org.joda.time.Instant;
* of {@link InMemoryState}. Whenever state that exists in the underlying {@link StateTable} is
* accessed, an independent copy will be created within this table.
*/
-public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> {
- private final K key;
- private final CopyOnAccessInMemoryStateTable<K> table;
+public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals {
+ private final CopyOnAccessInMemoryStateTable table;
+
+ private K key;
/**
* Creates a new {@link CopyOnAccessInMemoryStateInternals} with the underlying (possibly null)
* StateInternals.
*/
- public static <K> CopyOnAccessInMemoryStateInternals<K> withUnderlying(
- K key, @Nullable CopyOnAccessInMemoryStateInternals<K> underlying) {
- return new CopyOnAccessInMemoryStateInternals<K>(key, underlying);
+ public static <K> CopyOnAccessInMemoryStateInternals withUnderlying(
+ K key, @Nullable CopyOnAccessInMemoryStateInternals underlying) {
+ return new CopyOnAccessInMemoryStateInternals<>(key, underlying);
}
private CopyOnAccessInMemoryStateInternals(
- K key, CopyOnAccessInMemoryStateInternals<K> underlying) {
+ K key, CopyOnAccessInMemoryStateInternals underlying) {
this.key = key;
table =
- new CopyOnAccessInMemoryStateTable<K>(key, underlying == null ? null : underlying.table);
+ new CopyOnAccessInMemoryStateTable(underlying == null ? null : underlying.table);
}
/**
@@ -94,7 +95,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
*
* @return this table
*/
- public CopyOnAccessInMemoryStateInternals<K> commit() {
+ public CopyOnAccessInMemoryStateInternals commit() {
table.commit();
return this;
}
@@ -116,18 +117,18 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
}
@Override
- public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
+ public <T extends State> T state(StateNamespace namespace, StateTag<T> address) {
return state(namespace, address, StateContexts.nullContext());
}
@Override
public <T extends State> T state(
- StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c) {
+ StateNamespace namespace, StateTag<T> address, StateContext<?> c) {
return table.get(namespace, address, c);
}
@Override
- public K getKey() {
+ public Object getKey() {
return key;
}
@@ -140,9 +141,8 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
* {@link StateTable#get(StateNamespace, StateTag, StateContext)}, first attempts to obtain a
* copy of existing {@link State} from an underlying {@link StateTable}.
*/
- private static class CopyOnAccessInMemoryStateTable<K> extends StateTable<K> {
- private final K key;
- private Optional<StateTable<K>> underlying;
+ private static class CopyOnAccessInMemoryStateTable extends StateTable {
+ private Optional<StateTable> underlying;
/**
* The StateBinderFactory currently in use by this {@link CopyOnAccessInMemoryStateTable}.
@@ -162,17 +162,16 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
* when a {@link StateTag} is bound.</li>
* </ul>
*/
- private StateBinderFactory<K> binderFactory;
+ private StateBinderFactory binderFactory;
/**
* The earliest watermark hold in this table.
*/
private Optional<Instant> earliestWatermarkHold;
- public CopyOnAccessInMemoryStateTable(K key, StateTable<K> underlying) {
- this.key = key;
+ public CopyOnAccessInMemoryStateTable(StateTable underlying) {
this.underlying = Optional.fromNullable(underlying);
- binderFactory = new CopyOnBindBinderFactory<>(key, this.underlying);
+ binderFactory = new CopyOnBindBinderFactory(this.underlying);
earliestWatermarkHold = Optional.absent();
}
@@ -191,7 +190,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
private void commit() {
Instant earliestHold = getEarliestWatermarkHold();
if (underlying.isPresent()) {
- ReadThroughBinderFactory<K> readThroughBinder =
+ ReadThroughBinderFactory readThroughBinder =
new ReadThroughBinderFactory<>(underlying.get());
binderFactory = readThroughBinder;
Instant earliestUnderlyingHold = readThroughBinder.readThroughAndGetEarliestHold(this);
@@ -201,7 +200,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
}
earliestWatermarkHold = Optional.of(earliestHold);
clearEmpty();
- binderFactory = new InMemoryStateBinderFactory<>(key);
+ binderFactory = new InMemoryStateBinderFactory();
underlying = Optional.absent();
}
@@ -246,37 +245,35 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
}
@Override
- protected StateBinder<K> binderForNamespace(final StateNamespace namespace, StateContext<?> c) {
+ protected StateBinder binderForNamespace(final StateNamespace namespace, StateContext<?> c) {
return binderFactory.forNamespace(namespace, c);
}
- private interface StateBinderFactory<K> {
- StateBinder<K> forNamespace(StateNamespace namespace, StateContext<?> c);
+ private interface StateBinderFactory {
+ StateBinder forNamespace(StateNamespace namespace, StateContext<?> c);
}
/**
* {@link StateBinderFactory} that creates a copy of any existing state when the state is bound.
*/
- private static class CopyOnBindBinderFactory<K> implements StateBinderFactory<K> {
- private final K key;
- private final Optional<StateTable<K>> underlying;
+ private static class CopyOnBindBinderFactory implements StateBinderFactory {
+ private final Optional<StateTable> underlying;
- public CopyOnBindBinderFactory(K key, Optional<StateTable<K>> underlying) {
- this.key = key;
+ public CopyOnBindBinderFactory(Optional<StateTable> underlying) {
this.underlying = underlying;
}
- private boolean containedInUnderlying(StateNamespace namespace, StateTag<? super K, ?> tag) {
+ private boolean containedInUnderlying(StateNamespace namespace, StateTag<?> tag) {
return underlying.isPresent() && underlying.get().isNamespaceInUse(namespace)
&& underlying.get().getTagsInUse(namespace).containsKey(tag);
}
@Override
- public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
- return new StateBinder<K>() {
+ public StateBinder forNamespace(final StateNamespace namespace, final StateContext<?> c) {
+ return new StateBinder() {
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
+ public WatermarkHoldState bindWatermark(
+ StateTag<WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
@@ -291,7 +288,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@Override
public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ StateTag<ValueState<T>> address, Coder<T> coder) {
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
InMemoryState<? extends ValueState<T>> existingState =
@@ -306,7 +303,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineFn<InputT, AccumT, OutputT> combineFn) {
if (containedInUnderlying(namespace, address)) {
@@ -322,7 +319,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@Override
public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+ StateTag<BagState<T>> address, Coder<T> elemCoder) {
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
InMemoryState<? extends BagState<T>> existingState =
@@ -336,7 +333,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@Override
public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+ StateTag<SetState<T>> address, Coder<T> elemCoder) {
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
InMemoryState<? extends SetState<T>> existingState =
@@ -350,7 +347,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> address,
+ StateTag<MapState<KeyT, ValueT>> address,
Coder<KeyT> mapKeyCoder,
Coder<ValueT> mapValueCoder) {
if (containedInUnderlying(namespace, address)) {
@@ -367,7 +364,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
@@ -381,17 +378,17 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
* to {@link CopyOnAccessInMemoryStateTable#commit()} to read all values from
* the underlying table.
*/
- private static class ReadThroughBinderFactory<K> implements StateBinderFactory<K> {
- private final StateTable<K> underlying;
+ private static class ReadThroughBinderFactory<K> implements StateBinderFactory {
+ private final StateTable underlying;
- public ReadThroughBinderFactory(StateTable<K> underlying) {
+ public ReadThroughBinderFactory(StateTable underlying) {
this.underlying = underlying;
}
- public Instant readThroughAndGetEarliestHold(StateTable<K> readTo) {
+ public Instant readThroughAndGetEarliestHold(StateTable readTo) {
Instant earliestHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
for (StateNamespace namespace : underlying.getNamespacesInUse()) {
- for (Map.Entry<StateTag<? super K, ?>, ? extends State> existingState :
+ for (Map.Entry<StateTag<?>, ? extends State> existingState :
underlying.getTagsInUse(namespace).entrySet()) {
if (!((InMemoryState<?>) existingState.getValue()).isCleared()) {
// Only read through non-cleared values to ensure that completed windows are
@@ -412,44 +409,44 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
}
@Override
- public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
- return new StateBinder<K>() {
+ public StateBinder forNamespace(final StateNamespace namespace, final StateContext<?> c) {
+ return new StateBinder() {
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
+ public WatermarkHoldState bindWatermark(
+ StateTag<WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
return underlying.get(namespace, address, c);
}
@Override
public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ StateTag<ValueState<T>> address, Coder<T> coder) {
return underlying.get(namespace, address, c);
}
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
return underlying.get(namespace, address, c);
}
@Override
public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+ StateTag<BagState<T>> address, Coder<T> elemCoder) {
return underlying.get(namespace, address, c);
}
@Override
public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+ StateTag<SetState<T>> address, Coder<T> elemCoder) {
return underlying.get(namespace, address, c);
}
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> address,
+ StateTag<MapState<KeyT, ValueT>> address,
Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
return underlying.get(namespace, address, c);
}
@@ -457,7 +454,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return bindCombiningValue(
@@ -467,16 +464,13 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
}
}
- private static class InMemoryStateBinderFactory<K> implements StateBinderFactory<K> {
- private final K key;
+ private static class InMemoryStateBinderFactory implements StateBinderFactory {
- public InMemoryStateBinderFactory(K key) {
- this.key = key;
- }
+ public InMemoryStateBinderFactory() {}
@Override
- public StateBinder<K> forNamespace(StateNamespace namespace, StateContext<?> c) {
- return new InMemoryStateBinder<>(key, c);
+ public StateBinder forNamespace(StateNamespace namespace, StateContext<?> c) {
+ return new InMemoryStateBinder(c);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index 1108f0d..107f39a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -34,11 +34,14 @@ class DirectExecutionContext
extends BaseExecutionContext<DirectStepContext> {
private final Clock clock;
private final StructuralKey<?> key;
- private final CopyOnAccessInMemoryStateInternals<Object> existingState;
+ private final CopyOnAccessInMemoryStateInternals existingState;
private final TransformWatermarks watermarks;
- public DirectExecutionContext(Clock clock, StructuralKey<?> key,
- CopyOnAccessInMemoryStateInternals<Object> existingState, TransformWatermarks watermarks) {
+ public DirectExecutionContext(
+ Clock clock,
+ StructuralKey<?> key,
+ CopyOnAccessInMemoryStateInternals existingState,
+ TransformWatermarks watermarks) {
this.clock = clock;
this.key = key;
this.existingState = existingState;
@@ -55,7 +58,7 @@ class DirectExecutionContext
*/
public class DirectStepContext
extends BaseExecutionContext.StepContext {
- private CopyOnAccessInMemoryStateInternals<Object> stateInternals;
+ private CopyOnAccessInMemoryStateInternals<?> stateInternals;
private DirectTimerInternals timerInternals;
public DirectStepContext(
@@ -64,7 +67,7 @@ class DirectExecutionContext
}
@Override
- public CopyOnAccessInMemoryStateInternals<Object> stateInternals() {
+ public CopyOnAccessInMemoryStateInternals<?> stateInternals() {
if (stateInternals == null) {
stateInternals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, existingState);
}
@@ -84,7 +87,7 @@ class DirectExecutionContext
* Commits the state of this step, and returns the committed state. If the step has not
* accessed any state, return null.
*/
- public CopyOnAccessInMemoryStateInternals<?> commitState() {
+ public CopyOnAccessInMemoryStateInternals commitState() {
if (stateInternals != null) {
return stateInternals.commit();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 54ce027..f6d9a36 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -90,7 +90,7 @@ class EvaluationContext {
private final WatermarkCallbackExecutor callbackExecutor;
/** The stateInternals of the world, by applied PTransform and key. */
- private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals<?>>
+ private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals>
applicationStateInternals;
private final SideInputContainer sideInputContainer;
@@ -179,9 +179,9 @@ class EvaluationContext {
result.getAggregatorChanges().commit();
}
// Update state internals
- CopyOnAccessInMemoryStateInternals<?> theirState = result.getState();
+ CopyOnAccessInMemoryStateInternals theirState = result.getState();
if (theirState != null) {
- CopyOnAccessInMemoryStateInternals<?> committedState = theirState.commit();
+ CopyOnAccessInMemoryStateInternals committedState = theirState.commit();
StepAndKey stepAndKey =
StepAndKey.of(
result.getTransform(), completedBundle == null ? null : completedBundle.getKey());
@@ -331,7 +331,7 @@ class EvaluationContext {
return new DirectExecutionContext(
clock,
key,
- (CopyOnAccessInMemoryStateInternals<Object>) applicationStateInternals.get(stepAndKey),
+ (CopyOnAccessInMemoryStateInternals) applicationStateInternals.get(stepAndKey),
watermarkManager.getWatermarks(application));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index f1e29c6..9f567a4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -164,8 +164,8 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
(PCollection<KV<K, Iterable<V>>>)
Iterables.getOnlyElement(application.getOutputs().values()));
outputBundles.add(bundle);
- CopyOnAccessInMemoryStateInternals<K> stateInternals =
- (CopyOnAccessInMemoryStateInternals<K>) stepContext.stateInternals();
+ CopyOnAccessInMemoryStateInternals stateInternals =
+ (CopyOnAccessInMemoryStateInternals) stepContext.stateInternals();
DirectTimerInternals timerInternals = stepContext.timerInternals();
ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner =
new ReduceFnRunner<>(
@@ -191,7 +191,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
@Override
public TransformResult<KeyedWorkItem<K, V>> finishBundle() throws Exception {
// State is initialized within the constructor. It can never be null.
- CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
+ CopyOnAccessInMemoryStateInternals state = stepContext.commitState();
return StepTransformResult.<KeyedWorkItem<K, V>>withHold(
application, state.getEarliestWatermarkHold())
.withState(state)
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index cab11db..053da31 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -211,7 +211,7 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
throw UserCodeException.wrap(e);
}
StepTransformResult.Builder<InputT> resultBuilder;
- CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
+ CopyOnAccessInMemoryStateInternals state = stepContext.commitState();
if (state != null) {
resultBuilder =
StepTransformResult.<InputT>withHold(transform, state.getEarliestWatermarkHold())
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 7efdb52..e0adc40 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -127,7 +127,7 @@ class SplittableProcessElementsEvaluatorFactory<
new StateInternalsFactory<String>() {
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
- public StateInternals<String> stateInternalsForKey(String key) {
+ public StateInternals stateInternalsForKey(String key) {
return (StateInternals) stepContext.stateInternals();
}
});
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 8793ae8..93ab077 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -175,10 +175,11 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
@Override
public void run() {
for (StateDeclaration stateDecl : signature.stateDeclarations().values()) {
- StateTag<Object, ?> tag;
+ StateTag<?> tag;
try {
tag =
- StateTags.tagForSpec(stateDecl.id(), (StateSpec) stateDecl.field().get(doFn));
+ StateTags.tagForSpec(
+ stateDecl.id(), (StateSpec) stateDecl.field().get(doFn));
} catch (IllegalAccessException e) {
throw new RuntimeException(
String.format(
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index 01b2a72..fe3ae97 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -70,7 +70,7 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp
private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder;
private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElementsBuilder;
private MetricUpdates metricUpdates;
- private CopyOnAccessInMemoryStateInternals<?> state;
+ private CopyOnAccessInMemoryStateInternals state;
private TimerUpdate timerUpdate;
private AggregatorContainer.Mutator aggregatorChanges;
private final Set<OutputType> producedOutputs;
@@ -109,7 +109,7 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp
return this;
}
- public Builder<InputT> withState(CopyOnAccessInMemoryStateInternals<?> state) {
+ public Builder<InputT> withState(CopyOnAccessInMemoryStateInternals state) {
this.state = state;
return this;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
index 8bb5f93..bde44ca 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
@@ -85,7 +85,7 @@ public interface TransformResult<InputT> {
* <p>If this evaluation did not access state, this may return null.
*/
@Nullable
- CopyOnAccessInMemoryStateInternals<?> getState();
+ CopyOnAccessInMemoryStateInternals getState();
/**
* Returns a TimerUpdateBuilder that was produced as a result of this evaluation. If the
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 4d04745..3e29a69 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -72,7 +72,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+ StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = internals.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
@@ -92,7 +92,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+ StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = internals.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
@@ -114,18 +114,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
*/
@Test
public void testGetWithPresentInUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, ValueState<String>> valueTag = StateTags.value("foo", StringUtf8Coder.of());
+ StateTag<ValueState<String>> valueTag = StateTags.value("foo", StringUtf8Coder.of());
ValueState<String> underlyingValue = underlying.state(namespace, valueTag);
assertThat(underlyingValue.read(), nullValue(String.class));
underlyingValue.write("bar");
assertThat(underlyingValue.read(), equalTo("bar"));
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
ValueState<String> copyOnAccessState = internals.state(namespace, valueTag);
assertThat(copyOnAccessState.read(), equalTo("bar"));
@@ -140,18 +140,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testBagStateWithUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<Integer>> valueTag = StateTags.bag("foo", VarIntCoder.of());
+ StateTag<BagState<Integer>> valueTag = StateTags.bag("foo", VarIntCoder.of());
BagState<Integer> underlyingValue = underlying.state(namespace, valueTag);
assertThat(underlyingValue.read(), emptyIterable());
underlyingValue.add(1);
assertThat(underlyingValue.read(), containsInAnyOrder(1));
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
BagState<Integer> copyOnAccessState = internals.state(namespace, valueTag);
assertThat(copyOnAccessState.read(), containsInAnyOrder(1));
@@ -166,18 +166,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testSetStateWithUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, SetState<Integer>> valueTag = StateTags.set("foo", VarIntCoder.of());
+ StateTag<SetState<Integer>> valueTag = StateTags.set("foo", VarIntCoder.of());
SetState<Integer> underlyingValue = underlying.state(namespace, valueTag);
assertThat(underlyingValue.read(), emptyIterable());
underlyingValue.add(1);
assertThat(underlyingValue.read(), containsInAnyOrder(1));
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
SetState<Integer> copyOnAccessState = internals.state(namespace, valueTag);
assertThat(copyOnAccessState.read(), containsInAnyOrder(1));
@@ -192,11 +192,11 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testMapStateWithUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, MapState<String, Integer>> valueTag =
+ StateTag<MapState<String, Integer>> valueTag =
StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
MapState<String, Integer> underlyingValue = underlying.state(namespace, valueTag);
assertThat(underlyingValue.entries().read(), emptyIterable());
@@ -204,7 +204,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
underlyingValue.put("hello", 1);
assertThat(underlyingValue.get("hello").read(), equalTo(1));
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
MapState<String, Integer> copyOnAccessState = internals.state(namespace, valueTag);
assertThat(copyOnAccessState.get("hello").read(), equalTo(1));
@@ -221,13 +221,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testAccumulatorCombiningStateWithUnderlying() throws CannotProvideCoderException {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
CombineFn<Long, long[], Long> sumLongFn = Sum.ofLongs();
StateNamespace namespace = new StateNamespaceForTest("foo");
CoderRegistry reg = pipeline.getCoderRegistry();
- StateTag<Object, CombiningState<Long, long[], Long>> stateTag =
+ StateTag<CombiningState<Long, long[], Long>> stateTag =
StateTags.combiningValue("summer",
sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn);
GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
@@ -236,7 +236,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
underlyingValue.add(1L);
assertThat(underlyingValue.read(), equalTo(1L));
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
assertThat(copyOnAccessState.read(), equalTo(1L));
@@ -251,13 +251,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testWatermarkHoldStateWithUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
TimestampCombiner timestampCombiner = TimestampCombiner.EARLIEST;
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, WatermarkHoldState> stateTag =
+ StateTag<WatermarkHoldState> stateTag =
StateTags.watermarkStateInternal("wmstate", timestampCombiner);
WatermarkHoldState underlyingValue = underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), nullValue());
@@ -265,7 +265,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
underlyingValue.add(new Instant(250L));
assertThat(underlyingValue.read(), equalTo(new Instant(250L)));
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
WatermarkHoldState copyOnAccessState = internals.state(namespace, stateTag);
assertThat(copyOnAccessState.read(), equalTo(new Instant(250L)));
@@ -284,10 +284,10 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testCommitWithoutUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+ StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = internals.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
@@ -304,13 +304,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testCommitWithUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+ StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = underlying.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
@@ -331,15 +331,15 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testCommitWithClearedInUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- CopyOnAccessInMemoryStateInternals<String> secondUnderlying =
+ CopyOnAccessInMemoryStateInternals<String>secondUnderlying =
spy(CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying));
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, secondUnderlying);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+ StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = underlying.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
@@ -361,13 +361,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testCommitWithOverwrittenUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+ StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = underlying.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
@@ -392,15 +392,15 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testCommitWithAddedUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
internals.commit();
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+ StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = underlying.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
@@ -416,7 +416,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testCommitWithEmptyTableIsEmpty() {
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
internals.commit();
@@ -426,11 +426,11 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testCommitWithOnlyClearedValuesIsEmpty() {
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+ StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = internals.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
@@ -444,13 +444,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testCommitWithEmptyNewAndFullUnderlyingIsNotEmpty() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+ StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = underlying.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
@@ -475,16 +475,16 @@ public class CopyOnAccessInMemoryStateInternalsTest {
return new Instant(689743L);
}
};
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
- StateTag<Object, WatermarkHoldState> firstHoldAddress =
+ StateTag<WatermarkHoldState> firstHoldAddress =
StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
WatermarkHoldState firstHold =
internals.state(StateNamespaces.window(null, first), firstHoldAddress);
firstHold.add(new Instant(22L));
- StateTag<Object, WatermarkHoldState> secondHoldAddress =
+ StateTag<WatermarkHoldState> secondHoldAddress =
StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
WatermarkHoldState secondHold =
internals.state(StateNamespaces.window(null, second), secondHoldAddress);
@@ -508,18 +508,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
return new Instant(689743L);
}
};
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
- StateTag<Object, WatermarkHoldState> firstHoldAddress =
+ StateTag<WatermarkHoldState> firstHoldAddress =
StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
WatermarkHoldState firstHold =
underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
firstHold.add(new Instant(22L));
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
- StateTag<Object, WatermarkHoldState> secondHoldAddress =
+ StateTag<WatermarkHoldState> secondHoldAddress =
StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
WatermarkHoldState secondHold =
internals.state(StateNamespaces.window(null, second), secondHoldAddress);
@@ -545,18 +545,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
return new Instant(689743L);
}
};
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
- StateTag<Object, WatermarkHoldState> firstHoldAddress =
+ StateTag<WatermarkHoldState> firstHoldAddress =
StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
WatermarkHoldState firstHold =
underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
firstHold.add(new Instant(224L));
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
- StateTag<Object, WatermarkHoldState> secondHoldAddress =
+ StateTag<WatermarkHoldState> secondHoldAddress =
StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
WatermarkHoldState secondHold =
internals.state(StateNamespaces.window(null, second), secondHoldAddress);
@@ -568,7 +568,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testGetEarliestHoldBeforeCommit() {
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
internals
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 51ae12a..6f9adc4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -524,6 +524,7 @@ public class DirectRunnerTest implements Serializable {
}
private static class LongNoDecodeCoder extends CustomCoder<Long> {
+
@Override
public void encode(
Long value, OutputStream outStream, Context context) throws IOException {
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 0c3a8ed..bfbcd79 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -161,7 +161,7 @@ public class EvaluationContextTest {
context.getExecutionContext(createdProducer,
StructuralKey.of("foo", StringUtf8Coder.of()));
- StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+ StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
DirectStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1");
stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
@@ -194,7 +194,7 @@ public class EvaluationContextTest {
context.getExecutionContext(createdProducer,
StructuralKey.of("foo", StringUtf8Coder.of()));
- StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+ StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
fooContext
.getOrCreateStepContext("s1", "s1")
@@ -221,7 +221,7 @@ public class EvaluationContextTest {
DirectExecutionContext fooContext =
context.getExecutionContext(createdProducer, myKey);
- StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+ StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
fooContext
.getOrCreateStepContext("s1", "s1")
@@ -246,9 +246,9 @@ public class EvaluationContextTest {
DirectExecutionContext fooContext =
context.getExecutionContext(downstreamProducer, myKey);
- StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+ StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
- CopyOnAccessInMemoryStateInternals<Object> state =
+ CopyOnAccessInMemoryStateInternals<?> state =
fooContext.getOrCreateStepContext("s1", "s1").stateInternals();
BagState<Integer> bag = state.state(StateNamespaces.global(), intBag);
bag.add(1);
@@ -268,7 +268,7 @@ public class EvaluationContextTest {
DirectExecutionContext afterResultContext =
context.getExecutionContext(downstreamProducer, myKey);
- CopyOnAccessInMemoryStateInternals<Object> afterResultState =
+ CopyOnAccessInMemoryStateInternals<?> afterResultState =
afterResultContext.getOrCreateStepContext("s1", "s1").stateInternals();
assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index ecb8130..fc63406 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -92,7 +92,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
@Mock private transient UncommittedBundle<Integer> mockUncommittedBundle;
private static final String KEY = "any-key";
- private transient StateInternals<Object> stateInternals =
+ private transient StateInternals stateInternals =
CopyOnAccessInMemoryStateInternals.<Object>withUnderlying(KEY, null);
private static final BundleFactory BUNDLE_FACTORY = ImmutableListBundleFactory.create();
@@ -104,7 +104,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
- when((StateInternals<Object>) mockStepContext.stateInternals()).thenReturn(stateInternals);
+ when((StateInternals) mockStepContext.stateInternals()).thenReturn(stateInternals);
when(mockEvaluationContext.createSideInputReader(anyList()))
.thenReturn(
SideInputContainer.create(
@@ -133,7 +133,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
ParDo.of(
new DoFn<KV<String, Integer>, Integer>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<String>> spec =
+ private final StateSpec<ValueState<String>> spec =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
@@ -165,7 +165,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
StateNamespaces.window(IntervalWindow.getCoder(), firstWindow);
StateNamespace secondWindowNamespace =
StateNamespaces.window(IntervalWindow.getCoder(), secondWindow);
- StateTag<Object, ValueState<String>> tag =
+ StateTag<ValueState<String>> tag =
StateTags.tagForSpec(stateId, StateSpecs.value(StringUtf8Coder.of()));
// Set up non-empty state. We don't mock + verify calls to clear() but instead
@@ -247,7 +247,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
.of(
new DoFn<KV<String, Integer>, Integer>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<String>> spec =
+ private final StateSpec<ValueState<String>> spec =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
index 847a00a..8640801 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
@@ -61,7 +61,7 @@ public class FlinkNoOpStepContext implements StepContext {
}
@Override
- public StateInternals<?> stateInternals() {
+ public StateInternals stateInternals() {
return null;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index 879fad7..a79f856 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -123,7 +123,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
Collections.<TupleTag<?>>emptyList(),
new FlinkNoOpStepContext() {
@Override
- public StateInternals<?> stateInternals() {
+ public StateInternals stateInternals() {
return stateInternals;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 01830de..d8fd79a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -133,7 +133,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
protected transient long currentOutputWatermark;
- private transient StateTag<Object, BagState<WindowedValue<InputT>>> pushedBackTag;
+ private transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
protected transient FlinkStateInternals<?> stateInternals;
@@ -149,7 +149,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
protected transient FlinkTimerInternals timerInternals;
- private transient StateInternals<?> pushbackStateInternals;
+ private transient StateInternals pushbackStateInternals;
private transient Optional<Long> pushedBackWatermark;
@@ -673,7 +673,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
}
@Override
- public StateInternals<?> stateInternals() {
+ public StateInternals stateInternals() {
return stateInternals;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index fb6762d..1887a99 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -94,10 +94,10 @@ public class SplittableDoFnOperator<
StateInternalsFactory<String> stateInternalsFactory = new StateInternalsFactory<String>() {
@Override
- public StateInternals<String> stateInternalsForKey(String key) {
+ public StateInternals stateInternalsForKey(String key) {
//this will implicitly be keyed by the key of the incoming
// element or by the key of a firing timer
- return (StateInternals<String>) stateInternals;
+ return (StateInternals) stateInternals;
}
};
TimerInternalsFactory<String> timerInternalsFactory = new TimerInternalsFactory<String>() {
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 9718734..3899303 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -83,10 +83,10 @@ public class WindowDoFnOperator<K, InputT, OutputT>
protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() {
StateInternalsFactory<K> stateInternalsFactory = new StateInternalsFactory<K>() {
@Override
- public StateInternals<K> stateInternalsForKey(K key) {
+ public StateInternals stateInternalsForKey(K key) {
//this will implicitly be keyed by the key of the incoming
// element or by the key of a firing timer
- return (StateInternals<K>) stateInternals;
+ return (StateInternals) stateInternals;
}
};
TimerInternalsFactory<K> timerInternalsFactory = new TimerInternalsFactory<K>() {