You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/23 18:18:14 UTC

[15/16] beam git commit: Remove unused pieces of DirectStepContext

Remove unused pieces of DirectStepContext


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b32a1c35
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b32a1c35
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b32a1c35

Branch: refs/heads/master
Commit: b32a1c350398a91b1b1552d5257dab6ab7d1da3a
Parents: d425b27
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 23 11:13:19 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:16:42 2017 -0700

----------------------------------------------------------------------
 .../runners/direct/DirectExecutionContext.java    | 18 +++++-------------
 .../direct/GroupAlsoByWindowEvaluatorFactory.java |  2 +-
 .../runners/direct/ParDoEvaluatorFactory.java     |  2 +-
 ...SplittableProcessElementsEvaluatorFactory.java |  2 +-
 .../direct/StatefulParDoEvaluatorFactory.java     |  2 +-
 .../runners/direct/EvaluationContextTest.java     | 16 ++++++++--------
 .../beam/runners/direct/ParDoEvaluatorTest.java   |  2 +-
 .../direct/StatefulParDoEvaluatorFactoryTest.java |  4 ++--
 8 files changed, 20 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index 11c1b86..e8ad8d7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -48,19 +48,17 @@ class DirectExecutionContext {
     this.watermarks = watermarks;
   }
 
-  private DirectStepContext createStepContext(String stepName, String transformName) {
-    return new DirectStepContext(stepName, transformName);
+  private DirectStepContext createStepContext() {
+    return new DirectStepContext();
   }
 
   /**
    * Returns the {@link StepContext} associated with the given step.
    */
-  public DirectStepContext getStepContext(String stepName, String transformName) {
-    final String finalStepName = stepName;
-    final String finalTransformName = transformName;
+  public DirectStepContext getStepContext(String stepName) {
     DirectStepContext context = cachedStepContexts.get(stepName);
     if (context == null) {
-      context = createStepContext(finalStepName, finalTransformName);
+      context = createStepContext();
       cachedStepContexts.put(stepName, context);
     }
     return context;
@@ -72,14 +70,8 @@ class DirectExecutionContext {
   public class DirectStepContext implements StepContext {
     private CopyOnAccessInMemoryStateInternals<?> stateInternals;
     private DirectTimerInternals timerInternals;
-    private final String stepName;
-    private final String transformName;
 
-    public DirectStepContext(
-        String stepName, String transformName) {
-      this.stepName = stepName;
-      this.transformName = transformName;
-    }
+    public DirectStepContext() { }
 
     @Override
     public CopyOnAccessInMemoryStateInternals<?> stateInternals() {

http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 49b7512..1a588ee 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -130,7 +130,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
       stepContext = evaluationContext
           .getExecutionContext(application, inputBundle.getKey())
           .getStepContext(
-              evaluationContext.getStepName(application), application.getTransform().getName());
+              evaluationContext.getStepName(application));
       windowingStrategy =
           (WindowingStrategy<?, BoundedWindow>)
               application.getTransform().getInputWindowingStrategy();

http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 12c6751..8aa75cf 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -112,7 +112,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
     DirectStepContext stepContext =
         evaluationContext
             .getExecutionContext(application, inputBundleKey)
-            .getStepContext(stepName, stepName);
+            .getStepContext(stepName);
 
     DoFnLifecycleManager fnManager = fnClones.getUnchecked(doFn);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 13d9345..b85f481c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -109,7 +109,7 @@ class SplittableProcessElementsEvaluatorFactory<
     final DirectExecutionContext.DirectStepContext stepContext =
         evaluationContext
             .getExecutionContext(application, inputBundle.getKey())
-            .getStepContext(stepName, stepName);
+            .getStepContext(stepName);
 
     final ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
         parDoEvaluator =

http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 70d0cf5..506c84c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -163,7 +163,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
           evaluationContext
               .getExecutionContext(
                   transformOutputWindow.getTransform(), transformOutputWindow.getKey())
-              .getStepContext(stepName, stepName);
+              .getStepContext(stepName);
 
       final StateNamespace namespace =
           StateNamespaces.window(

http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 0e2be8d..80b04f8 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -160,7 +160,7 @@ public class EvaluationContextTest {
 
     StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
-    DirectStepContext stepContext = fooContext.getStepContext("s1", "s1");
+    DirectStepContext stepContext = fooContext.getStepContext("s1");
     stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
 
     context.handleResult(
@@ -177,7 +177,7 @@ public class EvaluationContextTest {
             StructuralKey.of("foo", StringUtf8Coder.of()));
     assertThat(
         secondFooContext
-            .getStepContext("s1", "s1")
+            .getStepContext("s1")
             .stateInternals()
             .state(StateNamespaces.global(), intBag)
             .read(),
@@ -194,7 +194,7 @@ public class EvaluationContextTest {
     StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
     fooContext
-        .getStepContext("s1", "s1")
+        .getStepContext("s1")
         .stateInternals()
         .state(StateNamespaces.global(), intBag)
         .add(1);
@@ -205,7 +205,7 @@ public class EvaluationContextTest {
     assertThat(barContext, not(equalTo(fooContext)));
     assertThat(
         barContext
-            .getStepContext("s1", "s1")
+            .getStepContext("s1")
             .stateInternals()
             .state(StateNamespaces.global(), intBag)
             .read(),
@@ -221,7 +221,7 @@ public class EvaluationContextTest {
     StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
     fooContext
-        .getStepContext("s1", "s1")
+        .getStepContext("s1")
         .stateInternals()
         .state(StateNamespaces.global(), intBag)
         .add(1);
@@ -230,7 +230,7 @@ public class EvaluationContextTest {
         context.getExecutionContext(downstreamProducer, myKey);
     assertThat(
         barContext
-            .getStepContext("s1", "s1")
+            .getStepContext("s1")
             .stateInternals()
             .state(StateNamespaces.global(), intBag)
             .read(),
@@ -246,7 +246,7 @@ public class EvaluationContextTest {
     StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
     CopyOnAccessInMemoryStateInternals<?> state =
-        fooContext.getStepContext("s1", "s1").stateInternals();
+        fooContext.getStepContext("s1").stateInternals();
     BagState<Integer> bag = state.state(StateNamespaces.global(), intBag);
     bag.add(1);
     bag.add(2);
@@ -266,7 +266,7 @@ public class EvaluationContextTest {
         context.getExecutionContext(downstreamProducer, myKey);
 
     CopyOnAccessInMemoryStateInternals<?> afterResultState =
-        afterResultContext.getStepContext("s1", "s1").stateInternals();
+        afterResultContext.getStepContext("s1").stateInternals();
     assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 22b3b7e..09a21ac 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -141,7 +141,7 @@ public class ParDoEvaluatorTest {
     DirectStepContext stepContext = mock(DirectStepContext.class);
     when(
             executionContext.getStepContext(
-                Mockito.any(String.class), Mockito.any(String.class)))
+                Mockito.any(String.class)))
         .thenReturn(stepContext);
     when(stepContext.getTimerUpdate()).thenReturn(TimerUpdate.empty());
     when(

http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index b233c1b..9366b7c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -153,7 +153,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
     when(mockEvaluationContext.getExecutionContext(
             eq(producingTransform), Mockito.<StructuralKey>any()))
         .thenReturn(mockExecutionContext);
-    when(mockExecutionContext.getStepContext(anyString(), anyString()))
+    when(mockExecutionContext.getStepContext(anyString()))
         .thenReturn(mockStepContext);
 
     IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(9));
@@ -269,7 +269,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
     when(mockEvaluationContext.getExecutionContext(
             eq(producingTransform), Mockito.<StructuralKey>any()))
         .thenReturn(mockExecutionContext);
-    when(mockExecutionContext.getStepContext(anyString(), anyString()))
+    when(mockExecutionContext.getStepContext(anyString()))
         .thenReturn(mockStepContext);
     when(mockEvaluationContext.createBundle(Matchers.<PCollection<Integer>>any()))
         .thenReturn(mockUncommittedBundle);