You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/23 18:18:14 UTC
[15/16] beam git commit: Remove unused pieces of DirectStepContext
Remove unused pieces of DirectStepContext
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b32a1c35
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b32a1c35
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b32a1c35
Branch: refs/heads/master
Commit: b32a1c350398a91b1b1552d5257dab6ab7d1da3a
Parents: d425b27
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 23 11:13:19 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:16:42 2017 -0700
----------------------------------------------------------------------
.../runners/direct/DirectExecutionContext.java | 18 +++++-------------
.../direct/GroupAlsoByWindowEvaluatorFactory.java | 2 +-
.../runners/direct/ParDoEvaluatorFactory.java | 2 +-
...SplittableProcessElementsEvaluatorFactory.java | 2 +-
.../direct/StatefulParDoEvaluatorFactory.java | 2 +-
.../runners/direct/EvaluationContextTest.java | 16 ++++++++--------
.../beam/runners/direct/ParDoEvaluatorTest.java | 2 +-
.../direct/StatefulParDoEvaluatorFactoryTest.java | 4 ++--
8 files changed, 20 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index 11c1b86..e8ad8d7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -48,19 +48,17 @@ class DirectExecutionContext {
this.watermarks = watermarks;
}
- private DirectStepContext createStepContext(String stepName, String transformName) {
- return new DirectStepContext(stepName, transformName);
+ private DirectStepContext createStepContext() {
+ return new DirectStepContext();
}
/**
* Returns the {@link StepContext} associated with the given step.
*/
- public DirectStepContext getStepContext(String stepName, String transformName) {
- final String finalStepName = stepName;
- final String finalTransformName = transformName;
+ public DirectStepContext getStepContext(String stepName) {
DirectStepContext context = cachedStepContexts.get(stepName);
if (context == null) {
- context = createStepContext(finalStepName, finalTransformName);
+ context = createStepContext();
cachedStepContexts.put(stepName, context);
}
return context;
@@ -72,14 +70,8 @@ class DirectExecutionContext {
public class DirectStepContext implements StepContext {
private CopyOnAccessInMemoryStateInternals<?> stateInternals;
private DirectTimerInternals timerInternals;
- private final String stepName;
- private final String transformName;
- public DirectStepContext(
- String stepName, String transformName) {
- this.stepName = stepName;
- this.transformName = transformName;
- }
+ public DirectStepContext() { }
@Override
public CopyOnAccessInMemoryStateInternals<?> stateInternals() {
http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 49b7512..1a588ee 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -130,7 +130,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
stepContext = evaluationContext
.getExecutionContext(application, inputBundle.getKey())
.getStepContext(
- evaluationContext.getStepName(application), application.getTransform().getName());
+ evaluationContext.getStepName(application));
windowingStrategy =
(WindowingStrategy<?, BoundedWindow>)
application.getTransform().getInputWindowingStrategy();
http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 12c6751..8aa75cf 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -112,7 +112,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
DirectStepContext stepContext =
evaluationContext
.getExecutionContext(application, inputBundleKey)
- .getStepContext(stepName, stepName);
+ .getStepContext(stepName);
DoFnLifecycleManager fnManager = fnClones.getUnchecked(doFn);
http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 13d9345..b85f481c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -109,7 +109,7 @@ class SplittableProcessElementsEvaluatorFactory<
final DirectExecutionContext.DirectStepContext stepContext =
evaluationContext
.getExecutionContext(application, inputBundle.getKey())
- .getStepContext(stepName, stepName);
+ .getStepContext(stepName);
final ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
parDoEvaluator =
http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 70d0cf5..506c84c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -163,7 +163,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
evaluationContext
.getExecutionContext(
transformOutputWindow.getTransform(), transformOutputWindow.getKey())
- .getStepContext(stepName, stepName);
+ .getStepContext(stepName);
final StateNamespace namespace =
StateNamespaces.window(
http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 0e2be8d..80b04f8 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -160,7 +160,7 @@ public class EvaluationContextTest {
StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
- DirectStepContext stepContext = fooContext.getStepContext("s1", "s1");
+ DirectStepContext stepContext = fooContext.getStepContext("s1");
stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
context.handleResult(
@@ -177,7 +177,7 @@ public class EvaluationContextTest {
StructuralKey.of("foo", StringUtf8Coder.of()));
assertThat(
secondFooContext
- .getStepContext("s1", "s1")
+ .getStepContext("s1")
.stateInternals()
.state(StateNamespaces.global(), intBag)
.read(),
@@ -194,7 +194,7 @@ public class EvaluationContextTest {
StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
fooContext
- .getStepContext("s1", "s1")
+ .getStepContext("s1")
.stateInternals()
.state(StateNamespaces.global(), intBag)
.add(1);
@@ -205,7 +205,7 @@ public class EvaluationContextTest {
assertThat(barContext, not(equalTo(fooContext)));
assertThat(
barContext
- .getStepContext("s1", "s1")
+ .getStepContext("s1")
.stateInternals()
.state(StateNamespaces.global(), intBag)
.read(),
@@ -221,7 +221,7 @@ public class EvaluationContextTest {
StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
fooContext
- .getStepContext("s1", "s1")
+ .getStepContext("s1")
.stateInternals()
.state(StateNamespaces.global(), intBag)
.add(1);
@@ -230,7 +230,7 @@ public class EvaluationContextTest {
context.getExecutionContext(downstreamProducer, myKey);
assertThat(
barContext
- .getStepContext("s1", "s1")
+ .getStepContext("s1")
.stateInternals()
.state(StateNamespaces.global(), intBag)
.read(),
@@ -246,7 +246,7 @@ public class EvaluationContextTest {
StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
CopyOnAccessInMemoryStateInternals<?> state =
- fooContext.getStepContext("s1", "s1").stateInternals();
+ fooContext.getStepContext("s1").stateInternals();
BagState<Integer> bag = state.state(StateNamespaces.global(), intBag);
bag.add(1);
bag.add(2);
@@ -266,7 +266,7 @@ public class EvaluationContextTest {
context.getExecutionContext(downstreamProducer, myKey);
CopyOnAccessInMemoryStateInternals<?> afterResultState =
- afterResultContext.getStepContext("s1", "s1").stateInternals();
+ afterResultContext.getStepContext("s1").stateInternals();
assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 22b3b7e..09a21ac 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -141,7 +141,7 @@ public class ParDoEvaluatorTest {
DirectStepContext stepContext = mock(DirectStepContext.class);
when(
executionContext.getStepContext(
- Mockito.any(String.class), Mockito.any(String.class)))
+ Mockito.any(String.class)))
.thenReturn(stepContext);
when(stepContext.getTimerUpdate()).thenReturn(TimerUpdate.empty());
when(
http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index b233c1b..9366b7c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -153,7 +153,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
when(mockEvaluationContext.getExecutionContext(
eq(producingTransform), Mockito.<StructuralKey>any()))
.thenReturn(mockExecutionContext);
- when(mockExecutionContext.getStepContext(anyString(), anyString()))
+ when(mockExecutionContext.getStepContext(anyString()))
.thenReturn(mockStepContext);
IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(9));
@@ -269,7 +269,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
when(mockEvaluationContext.getExecutionContext(
eq(producingTransform), Mockito.<StructuralKey>any()))
.thenReturn(mockExecutionContext);
- when(mockExecutionContext.getStepContext(anyString(), anyString()))
+ when(mockExecutionContext.getStepContext(anyString()))
.thenReturn(mockStepContext);
when(mockEvaluationContext.createBundle(Matchers.<PCollection<Integer>>any()))
.thenReturn(mockUncommittedBundle);