You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/23 18:18:00 UTC

[01/16] beam git commit: Move StepContext to top level

Repository: beam
Updated Branches:
  refs/heads/master 474345f59 -> c84d3da38


Move StepContext to top level


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/98a75551
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/98a75551
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/98a75551

Branch: refs/heads/master
Commit: 98a75551064c742d108d8c5ec8fc0783db7761d2
Parents: 474345f
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 22 15:28:44 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:16:26 2017 -0700

----------------------------------------------------------------------
 .../apex/translation/utils/NoOpStepContext.java |  6 +-
 .../beam/runners/core/BaseExecutionContext.java |  8 +--
 .../apache/beam/runners/core/DoFnRunners.java   |  1 -
 .../beam/runners/core/ExecutionContext.java     | 47 -------------
 .../beam/runners/core/SimpleDoFnRunner.java     |  1 -
 .../apache/beam/runners/core/StepContext.java   | 70 ++++++++++++++++++++
 .../functions/FlinkNoOpStepContext.java         |  2 +-
 .../wrappers/streaming/DoFnOperator.java        |  7 +-
 .../spark/translation/SparkProcessContext.java  |  2 +-
 .../beam/fn/harness/fake/FakeStepContext.java   |  2 +-
 10 files changed, 83 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
index 721eecd..241a985 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.apex.translation.utils;
 
 import java.io.IOException;
 import java.io.Serializable;
-import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -28,9 +28,9 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**
- * Serializable {@link ExecutionContext.StepContext} that does nothing.
+ * Serializable {@link StepContext} that does nothing.
  */
-public class NoOpStepContext implements ExecutionContext.StepContext, Serializable {
+public class NoOpStepContext implements StepContext, Serializable {
   private static final long serialVersionUID = 1L;
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
index 23d61f8..ed37143 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
@@ -49,7 +49,7 @@ import org.apache.beam.sdk.values.TupleTag;
  * {@link #getOrCreateStepContext(String, String)}, and {@link #getAllStepContexts()}
  * will be appropriately specialized.
  */
-public abstract class BaseExecutionContext<T extends ExecutionContext.StepContext>
+public abstract class BaseExecutionContext<T extends StepContext>
     implements ExecutionContext {
 
   private Map<String, T> cachedStepContexts = new LinkedHashMap<>();
@@ -81,7 +81,7 @@ public abstract class BaseExecutionContext<T extends ExecutionContext.StepContex
    * Factory method interface to create an execution context if none exists during
    * {@link #getOrCreateStepContext(String, CreateStepContextFunction)}.
    */
-  protected interface CreateStepContextFunction<T extends ExecutionContext.StepContext> {
+  protected interface CreateStepContextFunction<T extends org.apache.beam.runners.core.StepContext> {
     T create();
   }
 
@@ -111,12 +111,12 @@ public abstract class BaseExecutionContext<T extends ExecutionContext.StepContex
   public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {}
 
   /**
-   * Base class for implementations of {@link ExecutionContext.StepContext}.
+   * Base class for implementations of {@link org.apache.beam.runners.core.StepContext}.
    *
    * <p>To complete a concrete subclass, implement {@link #timerInternals} and
    * {@link #stateInternals}.
    */
-  public abstract static class StepContext implements ExecutionContext.StepContext {
+  public abstract static class StepContext implements org.apache.beam.runners.core.StepContext {
     private final ExecutionContext executionContext;
     private final String stepName;
     private final String transformName;

http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 71dfd11..9d3e25d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.core;
 
 import java.util.Collection;
 import java.util.List;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn;
 import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer;
 import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner;

http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
index d2fdaac..f431c92 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
@@ -17,11 +17,8 @@
  */
 package org.apache.beam.runners.core;
 
-import java.io.IOException;
 import java.util.Collection;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn.WindowedContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -52,48 +49,4 @@ public interface ExecutionContext {
    */
   void noteOutput(TupleTag<?> tag, WindowedValue<?> output);
 
-  /**
-   * Per-step, per-key context used for retrieving state.
-   */
-  public interface StepContext {
-
-    /**
-     * The name of the step.
-     */
-    String getStepName();
-
-    /**
-     * The name of the transform for the step.
-     */
-    String getTransformName();
-
-    /**
-     * Hook for subclasses to implement that will be called whenever
-     * {@link WindowedContext#output}
-     * is called.
-     */
-    void noteOutput(WindowedValue<?> output);
-
-    /**
-     * Hook for subclasses to implement that will be called whenever
-     * {@link WindowedContext#output}
-     * is called.
-     */
-    void noteOutput(TupleTag<?> tag, WindowedValue<?> output);
-
-    /**
-     * Writes the given {@code PCollectionView} data to a globally accessible location.
-     */
-    <T, W extends BoundedWindow> void writePCollectionViewData(
-        TupleTag<?> tag,
-        Iterable<WindowedValue<T>> data,
-        Coder<Iterable<WindowedValue<T>>> dataCoder,
-        W window,
-        Coder<W> windowCoder)
-            throws IOException;
-
-    StateInternals stateInternals();
-
-    TimerInternals timerInternals();
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 65384da..adbe62e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -29,7 +29,6 @@ import java.util.List;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.State;

http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
new file mode 100644
index 0000000..a414830
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.IOException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn.WindowedContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Per-step, per-key context used for retrieving state.
+ */
+public interface StepContext {
+
+  /**
+   * The name of the step.
+   */
+  String getStepName();
+
+  /**
+   * The name of the transform for the step.
+   */
+  String getTransformName();
+
+  /**
+   * Hook for subclasses to implement that will be called whenever
+   * {@link WindowedContext#output}
+   * is called.
+   */
+  void noteOutput(WindowedValue<?> output);
+
+  /**
+   * Hook for subclasses to implement that will be called whenever
+   * {@link WindowedContext#output}
+   * is called.
+   */
+  void noteOutput(TupleTag<?> tag, WindowedValue<?> output);
+
+  /**
+   * Writes the given {@code PCollectionView} data to a globally accessible location.
+   */
+  <T, W extends BoundedWindow> void writePCollectionViewData(
+      TupleTag<?> tag,
+      Iterable<WindowedValue<T>> data,
+      Coder<Iterable<WindowedValue<T>>> dataCoder,
+      W window,
+      Coder<W> windowCoder)
+          throws IOException;
+
+  StateInternals stateInternals();
+
+  TimerInternals timerInternals();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
index 8640801..c394ebd 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.flink.translation.functions;
 
 import java.io.IOException;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.coders.Coder;

http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index f35ba7a..c9f106a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -32,7 +32,6 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
 import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
@@ -184,7 +183,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
         TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
   }
 
-  private ExecutionContext.StepContext createStepContext() {
+  private org.apache.beam.runners.core.StepContext createStepContext() {
     return new StepContext();
   }
 
@@ -250,7 +249,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
     doFnInvoker.invokeSetup();
 
-    ExecutionContext.StepContext stepContext = createStepContext();
+    org.apache.beam.runners.core.StepContext stepContext = createStepContext();
 
     doFnRunner = DoFnRunners.simpleRunner(
         serializedOptions.getPipelineOptions(),
@@ -676,7 +675,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
    * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does not allow
    * accessing state or timer internals.
    */
-  protected class StepContext implements ExecutionContext.StepContext {
+  protected class StepContext implements org.apache.beam.runners.core.StepContext {
 
     @Override
     public String getStepName() {

http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index ffe343b..9147422 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -24,7 +24,7 @@ import java.io.IOException;
 import java.util.Iterator;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.coders.Coder;

http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
index 9b79d11..b206bc7 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
@@ -19,7 +19,7 @@
 package org.apache.beam.fn.harness.fake;
 
 import java.io.IOException;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.coders.Coder;


[16/16] beam git commit: This closes #3203: Eliminate ExecutionContext and strip down StepContext to what we actually need

Posted by ke...@apache.org.
This closes #3203: Eliminate ExecutionContext and strip down StepContext to what we actually need

  Remove unused pieces of DirectStepContext
  Revise StepContext javadoc
  Shorten excessive name in DirectExecutionContext
  Delete unused BaseStepContext
  Delete unused remnants in DirectExecutionContext
  Remove unused StepContext name methods
  Delete unused ExecutionContext
  Inline and delete BaseExecutionContext
  Remove writePCollectionViewData from the Beam codebase
  Implement StepContext directly in the DirectRunner
  Remove extraneous ExecutionContext parameter to BaseStepContext
  Move BaseStepContext to the top level
  Rename BaseExecutionContext.StepContext to BaseStepContext
  Remove StepContext.noteOutput
  Move StepContext to top level


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c84d3da3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c84d3da3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c84d3da3

Branch: refs/heads/master
Commit: c84d3da386e2b5a8d12ac02d44c50c1e102508c4
Parents: 474345f b32a1c3
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 23 11:17:35 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:17:35 2017 -0700

----------------------------------------------------------------------
 .../apex/translation/utils/NoOpStepContext.java |  37 +----
 .../beam/runners/core/BaseExecutionContext.java | 164 -------------------
 .../apache/beam/runners/core/DoFnRunners.java   |   1 -
 .../beam/runners/core/ExecutionContext.java     |  99 -----------
 .../beam/runners/core/SimpleDoFnRunner.java     |   7 -
 .../apache/beam/runners/core/StepContext.java   |  33 ++++
 .../beam/runners/core/SimpleDoFnRunnerTest.java |   4 +-
 .../runners/core/StatefulDoFnRunnerTest.java    |   4 +-
 .../runners/direct/DirectExecutionContext.java  |  35 ++--
 .../beam/runners/direct/EvaluationContext.java  |  29 ++--
 .../GroupAlsoByWindowEvaluatorFactory.java      |   4 +-
 .../runners/direct/ParDoEvaluatorFactory.java   |   2 +-
 ...littableProcessElementsEvaluatorFactory.java |   2 +-
 .../direct/StatefulParDoEvaluatorFactory.java   |   2 +-
 .../runners/direct/EvaluationContextTest.java   |  16 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |   4 +-
 .../StatefulParDoEvaluatorFactoryTest.java      |   4 +-
 .../functions/FlinkNoOpStepContext.java         |  36 +---
 .../wrappers/streaming/DoFnOperator.java        |  34 +---
 .../spark/translation/SparkProcessContext.java  |  29 +---
 .../beam/fn/harness/fake/FakeStepContext.java   |  33 +---
 21 files changed, 97 insertions(+), 482 deletions(-)
----------------------------------------------------------------------



[12/16] beam git commit: Delete unused ExecutionContext

Posted by ke...@apache.org.
Delete unused ExecutionContext


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97c230af
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97c230af
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97c230af

Branch: refs/heads/master
Commit: 97c230af62151fdbe06ac622282d69c74db30b2f
Parents: 0be3cf3
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 22 17:33:38 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:16:27 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/ExecutionContext.java     | 36 --------------------
 .../runners/direct/DirectExecutionContext.java  |  3 +-
 .../beam/runners/direct/EvaluationContext.java  | 29 +++++++---------
 3 files changed, 14 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/97c230af/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
deleted file mode 100644
index eac3599..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.util.Collection;
-
-/**
- * Context for the current execution. This is guaranteed to exist during processing,
- * but does not necessarily persist between different batches of work.
- */
-public interface ExecutionContext {
-  /**
-   * Returns the {@link StepContext} associated with the given step.
-   */
-  StepContext getOrCreateStepContext(String stepName, String transformName);
-
-  /**
-   * Returns a collection view of all of the {@link StepContext}s.
-   */
-  Collection<? extends StepContext> getAllStepContexts();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/97c230af/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index 9b68662..05dbebc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -22,7 +22,6 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import org.apache.beam.runners.core.BaseStepContext;
-import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
@@ -34,7 +33,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
  * <p>This implementation is not thread safe. A new {@link DirectExecutionContext} must be created
  * for each thread that requires it.
  */
-class DirectExecutionContext implements ExecutionContext {
+class DirectExecutionContext {
   private final Clock clock;
   private final StructuralKey<?> key;
   private final CopyOnAccessInMemoryStateInternals existingState;

http://git-wip-us.apache.org/repos/asf/beam/blob/97c230af/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index c627119..88ce85a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -31,7 +31,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
@@ -52,22 +51,20 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.Instant;
 
 /**
- * The evaluation context for a specific pipeline being executed by the
- * {@link DirectRunner}. Contains state shared within the execution across all
- * transforms.
+ * The evaluation context for a specific pipeline being executed by the {@link DirectRunner}.
+ * Contains state shared within the execution across all transforms.
  *
- * <p>{@link EvaluationContext} contains shared state for an execution of the
- * {@link DirectRunner} that can be used while evaluating a {@link PTransform}. This
- * consists of views into underlying state and watermark implementations, access to read and write
- * {@link PCollectionView PCollectionViews}, and managing the
- * {@link ExecutionContext ExecutionContexts}. This includes executing callbacks asynchronously when
- * state changes to the appropriate point (e.g. when a {@link PCollectionView} is requested and
- * known to be empty).
+ * <p>{@link EvaluationContext} contains shared state for an execution of the {@link DirectRunner}
+ * that can be used while evaluating a {@link PTransform}. This consists of views into underlying
+ * state and watermark implementations, access to read and write {@link PCollectionView
+ * PCollectionViews}, and managing the {@link DirectExecutionContext ExecutionContexts}. This
+ * includes executing callbacks asynchronously when state changes to the appropriate point (e.g.
+ * when a {@link PCollectionView} is requested and known to be empty).
  *
- * <p>{@link EvaluationContext} also handles results by committing finalizing bundles based
- * on the current global state and updating the global state appropriately. This includes updating
- * the per-{@link StepAndKey} state, updating global watermarks, and executing any callbacks that
- * can be executed.
+ * <p>{@link EvaluationContext} also handles results by committing finalizing bundles based on the
+ * current global state and updating the global state appropriately. This includes updating the
+ * per-{@link StepAndKey} state, updating global watermarks, and executing any callbacks that can be
+ * executed.
  */
 class EvaluationContext {
   /**
@@ -312,7 +309,7 @@ class EvaluationContext {
   }
 
   /**
-   * Get an {@link ExecutionContext} for the provided {@link AppliedPTransform} and key.
+   * Get a {@link DirectExecutionContext} for the provided {@link AppliedPTransform} and key.
    */
   public DirectExecutionContext getExecutionContext(
       AppliedPTransform<?, ?, ?> application, StructuralKey<?> key) {


[02/16] beam git commit: Delete unused remnants in DirectExecutionContext

Posted by ke...@apache.org.
Delete unused remnants in DirectExecutionContext


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dc585510
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dc585510
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dc585510

Branch: refs/heads/master
Commit: dc585510e6b4def3a0442114d77e96f2b5d4880f
Parents: acce24c
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 22 17:46:12 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:16:27 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectExecutionContext.java      | 11 +----------
 1 file changed, 1 insertion(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/dc585510/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index 651af8f..8452565 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -56,9 +56,8 @@ class DirectExecutionContext {
   }
 
   /**
-   * Returns the {@link BaseStepContext} associated with the given step.
+   * Returns the {@link StepContext} associated with the given step.
    */
-  @Override
   public DirectStepContext getOrCreateStepContext(String stepName, String transformName) {
     final String finalStepName = stepName;
     final String finalTransformName = transformName;
@@ -71,14 +70,6 @@ class DirectExecutionContext {
   }
 
   /**
-   * Returns a collection view of all of the {@link BaseStepContext}s.
-   */
-  @Override
-  public Collection<? extends DirectStepContext> getAllStepContexts() {
-    return Collections.unmodifiableCollection(cachedStepContexts.values());
-  }
-
-  /**
    * Step Context for the {@link DirectRunner}.
    */
   public class DirectStepContext implements StepContext {


[14/16] beam git commit: Revise StepContext javadoc

Posted by ke...@apache.org.
Revise StepContext javadoc


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d425b279
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d425b279
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d425b279

Branch: refs/heads/master
Commit: d425b2792f754ed6150f7b47eddf743286a45401
Parents: 0dc0334
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 23 11:10:42 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:16:35 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/runners/core/StepContext.java   | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d425b279/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
index 60fc402..4d66d66 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
@@ -18,7 +18,12 @@
 package org.apache.beam.runners.core;
 
 /**
- * Per-step, per-key context used for retrieving state.
+ * The context in which a specific step is executing, including access to state and timers.
+ *
+ * <p>This interface exists as the API between a runner and the support code, but is not user
+ * facing.
+ *
+ * <p>These will often be scoped to a particular step and key, though it is not required.
  */
 public interface StepContext {
 


[06/16] beam git commit: Remove extraneous ExecutionContext parameter to BaseStepContext

Posted by ke...@apache.org.
Remove extraneous ExecutionContext parameter to BaseStepContext


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/248c808a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/248c808a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/248c808a

Branch: refs/heads/master
Commit: 248c808a6603dc2c28a0b55296e0d596b8903a08
Parents: 59322d5
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 22 15:36:41 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:16:27 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/runners/core/BaseStepContext.java  | 4 +---
 .../org/apache/beam/runners/direct/DirectExecutionContext.java   | 2 +-
 2 files changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/248c808a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java
index f0436ac..014fe0d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java
@@ -30,12 +30,10 @@ import org.apache.beam.sdk.values.TupleTag;
  * {@link #stateInternals}.
  */
 public abstract class BaseStepContext implements StepContext {
-  private final ExecutionContext executionContext;
   private final String stepName;
   private final String transformName;
 
-  public BaseStepContext(ExecutionContext executionContext, String stepName, String transformName) {
-    this.executionContext = executionContext;
+  public BaseStepContext(String stepName, String transformName) {
     this.stepName = stepName;
     this.transformName = transformName;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/248c808a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index e5b88e5..d676f24 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -64,7 +64,7 @@ class DirectExecutionContext
 
     public DirectStepContext(
         ExecutionContext executionContext, String stepName, String transformName) {
-      super(executionContext, stepName, transformName);
+      super(stepName, transformName);
     }
 
     @Override


[15/16] beam git commit: Remove unused pieces of DirectStepContext

Posted by ke...@apache.org.
Remove unused pieces of DirectStepContext


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b32a1c35
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b32a1c35
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b32a1c35

Branch: refs/heads/master
Commit: b32a1c350398a91b1b1552d5257dab6ab7d1da3a
Parents: d425b27
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 23 11:13:19 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:16:42 2017 -0700

----------------------------------------------------------------------
 .../runners/direct/DirectExecutionContext.java    | 18 +++++-------------
 .../direct/GroupAlsoByWindowEvaluatorFactory.java |  2 +-
 .../runners/direct/ParDoEvaluatorFactory.java     |  2 +-
 ...SplittableProcessElementsEvaluatorFactory.java |  2 +-
 .../direct/StatefulParDoEvaluatorFactory.java     |  2 +-
 .../runners/direct/EvaluationContextTest.java     | 16 ++++++++--------
 .../beam/runners/direct/ParDoEvaluatorTest.java   |  2 +-
 .../direct/StatefulParDoEvaluatorFactoryTest.java |  4 ++--
 8 files changed, 20 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index 11c1b86..e8ad8d7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -48,19 +48,17 @@ class DirectExecutionContext {
     this.watermarks = watermarks;
   }
 
-  private DirectStepContext createStepContext(String stepName, String transformName) {
-    return new DirectStepContext(stepName, transformName);
+  private DirectStepContext createStepContext() {
+    return new DirectStepContext();
   }
 
   /**
    * Returns the {@link StepContext} associated with the given step.
    */
-  public DirectStepContext getStepContext(String stepName, String transformName) {
-    final String finalStepName = stepName;
-    final String finalTransformName = transformName;
+  public DirectStepContext getStepContext(String stepName) {
     DirectStepContext context = cachedStepContexts.get(stepName);
     if (context == null) {
-      context = createStepContext(finalStepName, finalTransformName);
+      context = createStepContext();
       cachedStepContexts.put(stepName, context);
     }
     return context;
@@ -72,14 +70,8 @@ class DirectExecutionContext {
   public class DirectStepContext implements StepContext {
     private CopyOnAccessInMemoryStateInternals<?> stateInternals;
     private DirectTimerInternals timerInternals;
-    private final String stepName;
-    private final String transformName;
 
-    public DirectStepContext(
-        String stepName, String transformName) {
-      this.stepName = stepName;
-      this.transformName = transformName;
-    }
+    public DirectStepContext() { }
 
     @Override
     public CopyOnAccessInMemoryStateInternals<?> stateInternals() {

http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 49b7512..1a588ee 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -130,7 +130,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
       stepContext = evaluationContext
           .getExecutionContext(application, inputBundle.getKey())
           .getStepContext(
-              evaluationContext.getStepName(application), application.getTransform().getName());
+              evaluationContext.getStepName(application));
       windowingStrategy =
           (WindowingStrategy<?, BoundedWindow>)
               application.getTransform().getInputWindowingStrategy();

http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 12c6751..8aa75cf 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -112,7 +112,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
     DirectStepContext stepContext =
         evaluationContext
             .getExecutionContext(application, inputBundleKey)
-            .getStepContext(stepName, stepName);
+            .getStepContext(stepName);
 
     DoFnLifecycleManager fnManager = fnClones.getUnchecked(doFn);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 13d9345..b85f481c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -109,7 +109,7 @@ class SplittableProcessElementsEvaluatorFactory<
     final DirectExecutionContext.DirectStepContext stepContext =
         evaluationContext
             .getExecutionContext(application, inputBundle.getKey())
-            .getStepContext(stepName, stepName);
+            .getStepContext(stepName);
 
     final ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
         parDoEvaluator =

http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 70d0cf5..506c84c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -163,7 +163,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
           evaluationContext
               .getExecutionContext(
                   transformOutputWindow.getTransform(), transformOutputWindow.getKey())
-              .getStepContext(stepName, stepName);
+              .getStepContext(stepName);
 
       final StateNamespace namespace =
           StateNamespaces.window(

http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 0e2be8d..80b04f8 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -160,7 +160,7 @@ public class EvaluationContextTest {
 
     StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
-    DirectStepContext stepContext = fooContext.getStepContext("s1", "s1");
+    DirectStepContext stepContext = fooContext.getStepContext("s1");
     stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
 
     context.handleResult(
@@ -177,7 +177,7 @@ public class EvaluationContextTest {
             StructuralKey.of("foo", StringUtf8Coder.of()));
     assertThat(
         secondFooContext
-            .getStepContext("s1", "s1")
+            .getStepContext("s1")
             .stateInternals()
             .state(StateNamespaces.global(), intBag)
             .read(),
@@ -194,7 +194,7 @@ public class EvaluationContextTest {
     StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
     fooContext
-        .getStepContext("s1", "s1")
+        .getStepContext("s1")
         .stateInternals()
         .state(StateNamespaces.global(), intBag)
         .add(1);
@@ -205,7 +205,7 @@ public class EvaluationContextTest {
     assertThat(barContext, not(equalTo(fooContext)));
     assertThat(
         barContext
-            .getStepContext("s1", "s1")
+            .getStepContext("s1")
             .stateInternals()
             .state(StateNamespaces.global(), intBag)
             .read(),
@@ -221,7 +221,7 @@ public class EvaluationContextTest {
     StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
     fooContext
-        .getStepContext("s1", "s1")
+        .getStepContext("s1")
         .stateInternals()
         .state(StateNamespaces.global(), intBag)
         .add(1);
@@ -230,7 +230,7 @@ public class EvaluationContextTest {
         context.getExecutionContext(downstreamProducer, myKey);
     assertThat(
         barContext
-            .getStepContext("s1", "s1")
+            .getStepContext("s1")
             .stateInternals()
             .state(StateNamespaces.global(), intBag)
             .read(),
@@ -246,7 +246,7 @@ public class EvaluationContextTest {
     StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
     CopyOnAccessInMemoryStateInternals<?> state =
-        fooContext.getStepContext("s1", "s1").stateInternals();
+        fooContext.getStepContext("s1").stateInternals();
     BagState<Integer> bag = state.state(StateNamespaces.global(), intBag);
     bag.add(1);
     bag.add(2);
@@ -266,7 +266,7 @@ public class EvaluationContextTest {
         context.getExecutionContext(downstreamProducer, myKey);
 
     CopyOnAccessInMemoryStateInternals<?> afterResultState =
-        afterResultContext.getStepContext("s1", "s1").stateInternals();
+        afterResultContext.getStepContext("s1").stateInternals();
     assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 22b3b7e..09a21ac 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -141,7 +141,7 @@ public class ParDoEvaluatorTest {
     DirectStepContext stepContext = mock(DirectStepContext.class);
     when(
             executionContext.getStepContext(
-                Mockito.any(String.class), Mockito.any(String.class)))
+                Mockito.any(String.class)))
         .thenReturn(stepContext);
     when(stepContext.getTimerUpdate()).thenReturn(TimerUpdate.empty());
     when(

http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index b233c1b..9366b7c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -153,7 +153,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
     when(mockEvaluationContext.getExecutionContext(
             eq(producingTransform), Mockito.<StructuralKey>any()))
         .thenReturn(mockExecutionContext);
-    when(mockExecutionContext.getStepContext(anyString(), anyString()))
+    when(mockExecutionContext.getStepContext(anyString()))
         .thenReturn(mockStepContext);
 
     IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(9));
@@ -269,7 +269,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
     when(mockEvaluationContext.getExecutionContext(
             eq(producingTransform), Mockito.<StructuralKey>any()))
         .thenReturn(mockExecutionContext);
-    when(mockExecutionContext.getStepContext(anyString(), anyString()))
+    when(mockExecutionContext.getStepContext(anyString()))
         .thenReturn(mockStepContext);
     when(mockEvaluationContext.createBundle(Matchers.<PCollection<Integer>>any()))
         .thenReturn(mockUncommittedBundle);


[10/16] beam git commit: Remove writePCollectionViewData from the Beam codebase

Posted by ke...@apache.org.
Remove writePCollectionViewData from the Beam codebase


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/32c6cb16
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/32c6cb16
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/32c6cb16

Branch: refs/heads/master
Commit: 32c6cb160f42e401f3e170cc8ed18d76c627d3e4
Parents: 5ac24e0
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 22 16:26:00 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:16:27 2017 -0700

----------------------------------------------------------------------
 .../apex/translation/utils/NoOpStepContext.java    | 13 -------------
 .../apache/beam/runners/core/BaseStepContext.java  | 14 --------------
 .../org/apache/beam/runners/core/StepContext.java  | 17 -----------------
 .../runners/direct/DirectExecutionContext.java     | 13 -------------
 .../functions/FlinkNoOpStepContext.java            | 16 +---------------
 .../wrappers/streaming/DoFnOperator.java           | 11 -----------
 .../spark/translation/SparkProcessContext.java     | 14 +-------------
 .../beam/fn/harness/fake/FakeStepContext.java      | 16 +---------------
 8 files changed, 3 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/32c6cb16/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
index 51e843b..820b189 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
@@ -17,15 +17,10 @@
  */
 package org.apache.beam.runners.apex.translation.utils;
 
-import java.io.IOException;
 import java.io.Serializable;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Serializable {@link StepContext} that does nothing.
@@ -44,14 +39,6 @@ public class NoOpStepContext implements StepContext, Serializable {
   }
 
   @Override
-  public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag,
-      Iterable<WindowedValue<T>> data,
-      Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws
-      IOException {
-
-  }
-
-  @Override
   public StateInternals stateInternals() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/32c6cb16/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java
index 014fe0d..e639c46 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java
@@ -17,12 +17,6 @@
  */
 package org.apache.beam.runners.core;
 
-import java.io.IOException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
 /**
  * Base class for implementations of {@link StepContext}.
  *
@@ -49,14 +43,6 @@ public abstract class BaseStepContext implements StepContext {
   }
 
   @Override
-  public <T, W extends BoundedWindow> void writePCollectionViewData(
-      TupleTag<?> tag,
-      Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>> dataCoder,
-      W window, Coder<W> windowCoder) throws IOException {
-    throw new UnsupportedOperationException("Not implemented.");
-  }
-
-  @Override
   public abstract StateInternals stateInternals();
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/32c6cb16/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
index fd2575d..62a81f1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
@@ -17,12 +17,6 @@
  */
 package org.apache.beam.runners.core;
 
-import java.io.IOException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
 /**
  * Per-step, per-key context used for retrieving state.
  */
@@ -38,17 +32,6 @@ public interface StepContext {
    */
   String getTransformName();
 
-  /**
-   * Writes the given {@code PCollectionView} data to a globally accessible location.
-   */
-  <T, W extends BoundedWindow> void writePCollectionViewData(
-      TupleTag<?> tag,
-      Iterable<WindowedValue<T>> data,
-      Coder<Iterable<WindowedValue<T>>> dataCoder,
-      W window,
-      Coder<W> windowCoder)
-          throws IOException;
-
   StateInternals stateInternals();
 
   TimerInternals timerInternals();

http://git-wip-us.apache.org/repos/asf/beam/blob/32c6cb16/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index 2a75ef5..39174d6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.direct;
 
-import java.io.IOException;
 import org.apache.beam.runners.core.BaseExecutionContext;
 import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.StepContext;
@@ -25,10 +24,6 @@ import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Execution Context for the {@link DirectRunner}.
@@ -112,14 +107,6 @@ class DirectExecutionContext
       return transformName;
     }
 
-    @Override
-    public <T, W extends BoundedWindow> void writePCollectionViewData(
-        TupleTag<?> tag,
-        Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>> dataCoder,
-        W window, Coder<W> windowCoder) throws IOException {
-      throw new UnsupportedOperationException("Not implemented.");
-    }
-
     /**
      * Gets the timer update of the {@link TimerInternals} of this {@link DirectStepContext},
      * which is empty if the {@link TimerInternals} were never accessed.

http://git-wip-us.apache.org/repos/asf/beam/blob/32c6cb16/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
index d999494..1ff322e 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
@@ -17,14 +17,9 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
-import java.io.IOException;
-import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * A {@link StepContext} for Flink Batch Runner execution.
@@ -42,15 +37,6 @@ public class FlinkNoOpStepContext implements StepContext {
   }
 
   @Override
-  public <T, W extends BoundedWindow> void writePCollectionViewData(
-      TupleTag<?> tag,
-      Iterable<WindowedValue<T>> data,
-      Coder<Iterable<WindowedValue<T>>> dataCoder,
-      W window,
-      Coder<W> windowCoder) throws IOException {
-  }
-
-  @Override
   public StateInternals stateInternals() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/32c6cb16/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 2bb9c20..4f8998e 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -23,7 +23,6 @@ import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -688,16 +687,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     }
 
     @Override
-    public <T, W extends BoundedWindow> void writePCollectionViewData(
-        TupleTag<?> tag,
-        Iterable<WindowedValue<T>> data,
-        Coder<Iterable<WindowedValue<T>>> dataCoder,
-        W window,
-        Coder<W> windowCoder) throws IOException {
-      throw new UnsupportedOperationException("Writing side-input data is not supported.");
-    }
-
-    @Override
     public StateInternals stateInternals() {
       return stateInternals;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/32c6cb16/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 31e616c..e693143 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -20,19 +20,15 @@ package org.apache.beam.runners.spark.translation;
 
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Lists;
-import java.io.IOException;
 import java.util.Iterator;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
 
 
 /**
@@ -110,14 +106,6 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
     }
 
     @Override
-    public <T, W extends BoundedWindow> void writePCollectionViewData(
-        TupleTag<?> tag,
-        Iterable<WindowedValue<T>> data,
-        Coder<Iterable<WindowedValue<T>>> dataCoder,
-        W window,
-        Coder<W> windowCoder) throws IOException { }
-
-    @Override
     public StateInternals stateInternals() {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/32c6cb16/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
index 750c167..3f6a2da 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
@@ -18,14 +18,9 @@
 
 package org.apache.beam.fn.harness.fake;
 
-import java.io.IOException;
-import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * A fake {@link StepContext} factory that performs no-ops.
@@ -42,15 +37,6 @@ public class FakeStepContext implements StepContext {
   }
 
   @Override
-  public <T, W extends BoundedWindow> void writePCollectionViewData(
-      TupleTag<?> tag,
-      Iterable<WindowedValue<T>> data,
-      Coder<Iterable<WindowedValue<T>>> dataCoder,
-      W window,
-      Coder<W> windowCoder) throws IOException {
-  }
-
-  @Override
   public StateInternals stateInternals() {
     throw new UnsupportedOperationException();
   }


[11/16] beam git commit: Inline and delete BaseExecutionContext

Posted by ke...@apache.org.
Inline and delete BaseExecutionContext


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0be3cf34
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0be3cf34
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0be3cf34

Branch: refs/heads/master
Commit: 0be3cf3462c19f0b007b2329c95ea4865d22cad5
Parents: 32c6cb1
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 22 16:50:41 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:16:27 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/BaseExecutionContext.java | 102 -------------------
 .../runners/direct/DirectExecutionContext.java  |  39 +++++--
 2 files changed, 32 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0be3cf34/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
deleted file mode 100644
index 877fa0a..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-/**
- * Base class for implementations of {@link ExecutionContext}.
- *
- * <p>A concrete subclass should implement {@link #createStepContext} to create the appropriate
- * {@link BaseStepContext} implementation. Any {@code StepContext} created will
- * be cached for the lifetime of this {@link ExecutionContext}.
- *
- * <p>BaseExecutionContext is generic to allow implementing subclasses to return a concrete subclass
- * of {@link BaseStepContext} from {@link #getOrCreateStepContext(String, String)} and
- * {@link #getAllStepContexts()} without forcing each subclass to override the method, e.g.
- * <pre>{@code
- * {@literal @}Override
- * StreamingModeExecutionContext.StepContext getOrCreateStepContext(...) {
- *   return (StreamingModeExecutionContext.StepContext) super.getOrCreateStepContext(...);
- * }
- * }</pre>
- *
- * <p>When a subclass of {@code BaseExecutionContext} has been downcast, the return types of
- * {@link #createStepContext(String, String)},
- * {@link #getOrCreateStepContext(String, String)}, and {@link #getAllStepContexts()}
- * will be appropriately specialized.
- */
-public abstract class BaseExecutionContext<T extends StepContext>
-    implements ExecutionContext {
-
-  private Map<String, T> cachedStepContexts = new LinkedHashMap<>();
-
-  /**
-   * Implementations should override this to create the specific type
-   * of {@link BaseStepContext} they need.
-   */
-  protected abstract T createStepContext(String stepName, String transformName);
-
-  /**
-   * Returns the {@link BaseStepContext} associated with the given step.
-   */
-  @Override
-  public T getOrCreateStepContext(String stepName, String transformName) {
-    final String finalStepName = stepName;
-    final String finalTransformName = transformName;
-    return getOrCreateStepContext(
-        stepName,
-        new CreateStepContextFunction<T>() {
-          @Override
-          public T create() {
-            return createStepContext(finalStepName, finalTransformName);
-          }
-        });
-  }
-
-  /**
-   * Factory method interface to create an execution context if none exists during
-   * {@link #getOrCreateStepContext(String, CreateStepContextFunction)}.
-   */
-  protected interface CreateStepContextFunction<T extends org.apache.beam.runners.core.StepContext> {
-    T create();
-  }
-
-  protected final T getOrCreateStepContext(String stepName,
-      CreateStepContextFunction<T> createContextFunc) {
-    T context = cachedStepContexts.get(stepName);
-    if (context == null) {
-      context = createContextFunc.create();
-      cachedStepContexts.put(stepName, context);
-    }
-
-    return context;
-  }
-
-  /**
-   * Returns a collection view of all of the {@link BaseStepContext}s.
-   */
-  @Override
-  public Collection<? extends T> getAllStepContexts() {
-    return Collections.unmodifiableCollection(cachedStepContexts.values());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0be3cf34/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index 39174d6..9b68662 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -17,11 +17,14 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.core.BaseExecutionContext;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.beam.runners.core.BaseStepContext;
 import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
 
@@ -31,12 +34,12 @@ import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
  * <p>This implementation is not thread safe. A new {@link DirectExecutionContext} must be created
  * for each thread that requires it.
  */
-class DirectExecutionContext
-    extends BaseExecutionContext<DirectStepContext> {
+class DirectExecutionContext implements ExecutionContext {
   private final Clock clock;
   private final StructuralKey<?> key;
   private final CopyOnAccessInMemoryStateInternals existingState;
   private final TransformWatermarks watermarks;
+  private Map<String, DirectStepContext> cachedStepContexts = new LinkedHashMap<>();
 
   public DirectExecutionContext(
       Clock clock,
@@ -49,9 +52,31 @@ class DirectExecutionContext
     this.watermarks = watermarks;
   }
 
+  private DirectStepContext createStepContext(String stepName, String transformName) {
+    return new DirectStepContext(stepName, transformName);
+  }
+
+  /**
+   * Returns the {@link BaseStepContext} associated with the given step.
+   */
+  @Override
+  public DirectStepContext getOrCreateStepContext(String stepName, String transformName) {
+    final String finalStepName = stepName;
+    final String finalTransformName = transformName;
+    DirectStepContext context = cachedStepContexts.get(stepName);
+    if (context == null) {
+      context = createStepContext(finalStepName, finalTransformName);
+      cachedStepContexts.put(stepName, context);
+    }
+    return context;
+  }
+
+  /**
+   * Returns a collection view of all of the {@link BaseStepContext}s.
+   */
   @Override
-  protected DirectStepContext createStepContext(String stepName, String transformName) {
-    return new DirectStepContext(this, stepName, transformName);
+  public Collection<? extends DirectStepContext> getAllStepContexts() {
+    return Collections.unmodifiableCollection(cachedStepContexts.values());
   }
 
   /**
@@ -64,7 +89,7 @@ class DirectExecutionContext
     private final String transformName;
 
     public DirectStepContext(
-        ExecutionContext executionContext, String stepName, String transformName) {
+        String stepName, String transformName) {
       this.stepName = stepName;
       this.transformName = transformName;
     }


[03/16] beam git commit: Remove unused StepContext name methods

Posted by ke...@apache.org.
Remove unused StepContext name methods


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/acce24ce
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/acce24ce
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/acce24ce

Branch: refs/heads/master
Commit: acce24ce1388b7953fbb9d87da5bb2271286c58a
Parents: 97c230a
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 22 17:42:10 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:16:27 2017 -0700

----------------------------------------------------------------------
 .../runners/apex/translation/utils/NoOpStepContext.java   | 10 ----------
 .../org/apache/beam/runners/core/BaseStepContext.java     | 10 ----------
 .../java/org/apache/beam/runners/core/StepContext.java    | 10 ----------
 .../beam/runners/direct/DirectExecutionContext.java       | 10 ----------
 .../flink/translation/functions/FlinkNoOpStepContext.java | 10 ----------
 .../translation/wrappers/streaming/DoFnOperator.java      | 10 ----------
 .../runners/spark/translation/SparkProcessContext.java    |  9 ---------
 .../org/apache/beam/fn/harness/fake/FakeStepContext.java  |  9 ---------
 8 files changed, 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/acce24ce/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
index 820b189..b49e4da 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
@@ -29,16 +29,6 @@ public class NoOpStepContext implements StepContext, Serializable {
   private static final long serialVersionUID = 1L;
 
   @Override
-  public String getStepName() {
-    return null;
-  }
-
-  @Override
-  public String getTransformName() {
-    return null;
-  }
-
-  @Override
   public StateInternals stateInternals() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/acce24ce/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java
index e639c46..4abd4d2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java
@@ -33,16 +33,6 @@ public abstract class BaseStepContext implements StepContext {
   }
 
   @Override
-  public String getStepName() {
-    return stepName;
-  }
-
-  @Override
-  public String getTransformName() {
-    return transformName;
-  }
-
-  @Override
   public abstract StateInternals stateInternals();
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/acce24ce/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
index 62a81f1..60fc402 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
@@ -22,16 +22,6 @@ package org.apache.beam.runners.core;
  */
 public interface StepContext {
 
-  /**
-   * The name of the step.
-   */
-  String getStepName();
-
-  /**
-   * The name of the transform for the step.
-   */
-  String getTransformName();
-
   StateInternals stateInternals();
 
   TimerInternals timerInternals();

http://git-wip-us.apache.org/repos/asf/beam/blob/acce24ce/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index 05dbebc..651af8f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -121,16 +121,6 @@ class DirectExecutionContext {
       return null;
     }
 
-    @Override
-    public String getStepName() {
-      return stepName;
-    }
-
-    @Override
-    public String getTransformName() {
-      return transformName;
-    }
-
     /**
      * Gets the timer update of the {@link TimerInternals} of this {@link DirectStepContext},
      * which is empty if the {@link TimerInternals} were never accessed.

http://git-wip-us.apache.org/repos/asf/beam/blob/acce24ce/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
index 1ff322e..9c7b636 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
@@ -27,16 +27,6 @@ import org.apache.beam.runners.core.TimerInternals;
 public class FlinkNoOpStepContext implements StepContext {
 
   @Override
-  public String getStepName() {
-    return null;
-  }
-
-  @Override
-  public String getTransformName() {
-    return null;
-  }
-
-  @Override
   public StateInternals stateInternals() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/acce24ce/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 4f8998e..d2ab7e1 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -677,16 +677,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
   protected class StepContext implements org.apache.beam.runners.core.StepContext {
 
     @Override
-    public String getStepName() {
-      return null;
-    }
-
-    @Override
-    public String getTransformName() {
-      return null;
-    }
-
-    @Override
     public StateInternals stateInternals() {
       return stateInternals;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/acce24ce/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index e693143..f4ab7d9 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -95,15 +95,6 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
   }
 
   static class NoOpStepContext implements StepContext {
-    @Override
-    public String getStepName() {
-      return null;
-    }
-
-    @Override
-    public String getTransformName() {
-      return null;
-    }
 
     @Override
     public StateInternals stateInternals() {

http://git-wip-us.apache.org/repos/asf/beam/blob/acce24ce/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
index 3f6a2da..bdf138b 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
@@ -26,15 +26,6 @@ import org.apache.beam.runners.core.TimerInternals;
  * A fake {@link StepContext} factory that performs no-ops.
  */
 public class FakeStepContext implements StepContext {
-  @Override
-  public String getStepName() {
-    return "TODO";
-  }
-
-  @Override
-  public String getTransformName() {
-    return "TODO";
-  }
 
   @Override
   public StateInternals stateInternals() {


[08/16] beam git commit: Implement StepContext directly in the DirectRunner

Posted by ke...@apache.org.
Implement StepContext directly in the DirectRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5ac24e0a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5ac24e0a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5ac24e0a

Branch: refs/heads/master
Commit: 5ac24e0a89b95feafccbe381bdde9c11fdf82a88
Parents: 248c808
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 22 15:44:17 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:16:27 2017 -0700

----------------------------------------------------------------------
 .../runners/direct/DirectExecutionContext.java  | 33 +++++++++++++++++---
 1 file changed, 29 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5ac24e0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index d676f24..2a75ef5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -17,13 +17,18 @@
  */
 package org.apache.beam.runners.direct;
 
+import java.io.IOException;
 import org.apache.beam.runners.core.BaseExecutionContext;
-import org.apache.beam.runners.core.BaseStepContext;
 import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Execution Context for the {@link DirectRunner}.
@@ -57,14 +62,16 @@ class DirectExecutionContext
   /**
    * Step Context for the {@link DirectRunner}.
    */
-  public class DirectStepContext
-      extends BaseStepContext {
+  public class DirectStepContext implements StepContext {
     private CopyOnAccessInMemoryStateInternals<?> stateInternals;
     private DirectTimerInternals timerInternals;
+    private final String stepName;
+    private final String transformName;
 
     public DirectStepContext(
         ExecutionContext executionContext, String stepName, String transformName) {
-      super(stepName, transformName);
+      this.stepName = stepName;
+      this.transformName = transformName;
     }
 
     @Override
@@ -95,6 +102,24 @@ class DirectExecutionContext
       return null;
     }
 
+    @Override
+    public String getStepName() {
+      return stepName;
+    }
+
+    @Override
+    public String getTransformName() {
+      return transformName;
+    }
+
+    @Override
+    public <T, W extends BoundedWindow> void writePCollectionViewData(
+        TupleTag<?> tag,
+        Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>> dataCoder,
+        W window, Coder<W> windowCoder) throws IOException {
+      throw new UnsupportedOperationException("Not implemented.");
+    }
+
     /**
      * Gets the timer update of the {@link TimerInternals} of this {@link DirectStepContext},
      * which is empty if the {@link TimerInternals} were never accessed.


[09/16] beam git commit: Shorten excessive name in DirectExecutionContext

Posted by ke...@apache.org.
Shorten excessive name in DirectExecutionContext


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0dc0334a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0dc0334a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0dc0334a

Branch: refs/heads/master
Commit: 0dc0334a0c1350c1693019f104dac911a618c9c8
Parents: 62115b2
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 22 17:49:01 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:16:27 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectExecutionContext.java |  2 +-
 .../direct/GroupAlsoByWindowEvaluatorFactory.java   |  2 +-
 .../beam/runners/direct/ParDoEvaluatorFactory.java  |  2 +-
 .../SplittableProcessElementsEvaluatorFactory.java  |  2 +-
 .../direct/StatefulParDoEvaluatorFactory.java       |  2 +-
 .../beam/runners/direct/EvaluationContextTest.java  | 16 ++++++++--------
 .../beam/runners/direct/ParDoEvaluatorTest.java     |  2 +-
 .../direct/StatefulParDoEvaluatorFactoryTest.java   |  4 ++--
 8 files changed, 16 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0dc0334a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index cca5719..11c1b86 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -55,7 +55,7 @@ class DirectExecutionContext {
   /**
    * Returns the {@link StepContext} associated with the given step.
    */
-  public DirectStepContext getOrCreateStepContext(String stepName, String transformName) {
+  public DirectStepContext getStepContext(String stepName, String transformName) {
     final String finalStepName = stepName;
     final String finalTransformName = transformName;
     DirectStepContext context = cachedStepContexts.get(stepName);

http://git-wip-us.apache.org/repos/asf/beam/blob/0dc0334a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 78ef7fe..49b7512 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -129,7 +129,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
       structuralKey = inputBundle.getKey();
       stepContext = evaluationContext
           .getExecutionContext(application, inputBundle.getKey())
-          .getOrCreateStepContext(
+          .getStepContext(
               evaluationContext.getStepName(application), application.getTransform().getName());
       windowingStrategy =
           (WindowingStrategy<?, BoundedWindow>)

http://git-wip-us.apache.org/repos/asf/beam/blob/0dc0334a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 74470bf..12c6751 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -112,7 +112,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
     DirectStepContext stepContext =
         evaluationContext
             .getExecutionContext(application, inputBundleKey)
-            .getOrCreateStepContext(stepName, stepName);
+            .getStepContext(stepName, stepName);
 
     DoFnLifecycleManager fnManager = fnClones.getUnchecked(doFn);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0dc0334a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index dc85d87..13d9345 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -109,7 +109,7 @@ class SplittableProcessElementsEvaluatorFactory<
     final DirectExecutionContext.DirectStepContext stepContext =
         evaluationContext
             .getExecutionContext(application, inputBundle.getKey())
-            .getOrCreateStepContext(stepName, stepName);
+            .getStepContext(stepName, stepName);
 
     final ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
         parDoEvaluator =

http://git-wip-us.apache.org/repos/asf/beam/blob/0dc0334a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 985c3be..70d0cf5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -163,7 +163,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
           evaluationContext
               .getExecutionContext(
                   transformOutputWindow.getTransform(), transformOutputWindow.getKey())
-              .getOrCreateStepContext(stepName, stepName);
+              .getStepContext(stepName, stepName);
 
       final StateNamespace namespace =
           StateNamespaces.window(

http://git-wip-us.apache.org/repos/asf/beam/blob/0dc0334a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 72b1bbc..0e2be8d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -160,7 +160,7 @@ public class EvaluationContextTest {
 
     StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
-    DirectStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1");
+    DirectStepContext stepContext = fooContext.getStepContext("s1", "s1");
     stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
 
     context.handleResult(
@@ -177,7 +177,7 @@ public class EvaluationContextTest {
             StructuralKey.of("foo", StringUtf8Coder.of()));
     assertThat(
         secondFooContext
-            .getOrCreateStepContext("s1", "s1")
+            .getStepContext("s1", "s1")
             .stateInternals()
             .state(StateNamespaces.global(), intBag)
             .read(),
@@ -194,7 +194,7 @@ public class EvaluationContextTest {
     StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
     fooContext
-        .getOrCreateStepContext("s1", "s1")
+        .getStepContext("s1", "s1")
         .stateInternals()
         .state(StateNamespaces.global(), intBag)
         .add(1);
@@ -205,7 +205,7 @@ public class EvaluationContextTest {
     assertThat(barContext, not(equalTo(fooContext)));
     assertThat(
         barContext
-            .getOrCreateStepContext("s1", "s1")
+            .getStepContext("s1", "s1")
             .stateInternals()
             .state(StateNamespaces.global(), intBag)
             .read(),
@@ -221,7 +221,7 @@ public class EvaluationContextTest {
     StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
     fooContext
-        .getOrCreateStepContext("s1", "s1")
+        .getStepContext("s1", "s1")
         .stateInternals()
         .state(StateNamespaces.global(), intBag)
         .add(1);
@@ -230,7 +230,7 @@ public class EvaluationContextTest {
         context.getExecutionContext(downstreamProducer, myKey);
     assertThat(
         barContext
-            .getOrCreateStepContext("s1", "s1")
+            .getStepContext("s1", "s1")
             .stateInternals()
             .state(StateNamespaces.global(), intBag)
             .read(),
@@ -246,7 +246,7 @@ public class EvaluationContextTest {
     StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
     CopyOnAccessInMemoryStateInternals<?> state =
-        fooContext.getOrCreateStepContext("s1", "s1").stateInternals();
+        fooContext.getStepContext("s1", "s1").stateInternals();
     BagState<Integer> bag = state.state(StateNamespaces.global(), intBag);
     bag.add(1);
     bag.add(2);
@@ -266,7 +266,7 @@ public class EvaluationContextTest {
         context.getExecutionContext(downstreamProducer, myKey);
 
     CopyOnAccessInMemoryStateInternals<?> afterResultState =
-        afterResultContext.getOrCreateStepContext("s1", "s1").stateInternals();
+        afterResultContext.getStepContext("s1", "s1").stateInternals();
     assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0dc0334a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 286e44d..22b3b7e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -140,7 +140,7 @@ public class ParDoEvaluatorTest {
     DirectExecutionContext executionContext = mock(DirectExecutionContext.class);
     DirectStepContext stepContext = mock(DirectStepContext.class);
     when(
-            executionContext.getOrCreateStepContext(
+            executionContext.getStepContext(
                 Mockito.any(String.class), Mockito.any(String.class)))
         .thenReturn(stepContext);
     when(stepContext.getTimerUpdate()).thenReturn(TimerUpdate.empty());

http://git-wip-us.apache.org/repos/asf/beam/blob/0dc0334a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index eb54d5c..b233c1b 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -153,7 +153,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
     when(mockEvaluationContext.getExecutionContext(
             eq(producingTransform), Mockito.<StructuralKey>any()))
         .thenReturn(mockExecutionContext);
-    when(mockExecutionContext.getOrCreateStepContext(anyString(), anyString()))
+    when(mockExecutionContext.getStepContext(anyString(), anyString()))
         .thenReturn(mockStepContext);
 
     IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(9));
@@ -269,7 +269,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
     when(mockEvaluationContext.getExecutionContext(
             eq(producingTransform), Mockito.<StructuralKey>any()))
         .thenReturn(mockExecutionContext);
-    when(mockExecutionContext.getOrCreateStepContext(anyString(), anyString()))
+    when(mockExecutionContext.getStepContext(anyString(), anyString()))
         .thenReturn(mockStepContext);
     when(mockEvaluationContext.createBundle(Matchers.<PCollection<Integer>>any()))
         .thenReturn(mockUncommittedBundle);


[04/16] beam git commit: Remove StepContext.noteOutput

Posted by ke...@apache.org.
Remove StepContext.noteOutput


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bed1c53f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bed1c53f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bed1c53f

Branch: refs/heads/master
Commit: bed1c53fb47e2d623d6671ce69b82579992df642
Parents: 98a7555
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 22 15:30:33 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:16:27 2017 -0700

----------------------------------------------------------------------
 .../apex/translation/utils/NoOpStepContext.java     |  8 --------
 .../beam/runners/core/BaseExecutionContext.java     | 16 ----------------
 .../apache/beam/runners/core/ExecutionContext.java  | 16 ----------------
 .../apache/beam/runners/core/SimpleDoFnRunner.java  |  6 ------
 .../org/apache/beam/runners/core/StepContext.java   | 15 ---------------
 .../translation/functions/FlinkNoOpStepContext.java | 10 ----------
 .../wrappers/streaming/DoFnOperator.java            |  6 ------
 .../spark/translation/SparkProcessContext.java      |  6 ------
 .../beam/fn/harness/fake/FakeStepContext.java       |  8 --------
 9 files changed, 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bed1c53f/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
index 241a985..51e843b 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
@@ -44,14 +44,6 @@ public class NoOpStepContext implements StepContext, Serializable {
   }
 
   @Override
-  public void noteOutput(WindowedValue<?> output) {
-  }
-
-  @Override
-  public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {
-  }
-
-  @Override
   public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag,
       Iterable<WindowedValue<T>> data,
       Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws

http://git-wip-us.apache.org/repos/asf/beam/blob/bed1c53f/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
index ed37143..a006999 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
@@ -104,12 +104,6 @@ public abstract class BaseExecutionContext<T extends StepContext>
     return Collections.unmodifiableCollection(cachedStepContexts.values());
   }
 
-  @Override
-  public void noteOutput(WindowedValue<?> output) {}
-
-  @Override
-  public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {}
-
   /**
    * Base class for implementations of {@link org.apache.beam.runners.core.StepContext}.
    *
@@ -138,16 +132,6 @@ public abstract class BaseExecutionContext<T extends StepContext>
     }
 
     @Override
-    public void noteOutput(WindowedValue<?> output) {
-      executionContext.noteOutput(output);
-    }
-
-    @Override
-    public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {
-      executionContext.noteOutput(tag, output);
-    }
-
-    @Override
     public <T, W extends BoundedWindow> void writePCollectionViewData(
         TupleTag<?> tag,
         Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>> dataCoder,

http://git-wip-us.apache.org/repos/asf/beam/blob/bed1c53f/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
index f431c92..eac3599 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
@@ -18,9 +18,6 @@
 package org.apache.beam.runners.core;
 
 import java.util.Collection;
-import org.apache.beam.sdk.transforms.DoFn.WindowedContext;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Context for the current execution. This is guaranteed to exist during processing,
@@ -36,17 +33,4 @@ public interface ExecutionContext {
    * Returns a collection view of all of the {@link StepContext}s.
    */
   Collection<? extends StepContext> getAllStepContexts();
-
-  /**
-   * Hook for subclasses to implement that will be called whenever
-   * {@link WindowedContext#output(TupleTag, Object)} is called.
-   */
-  void noteOutput(WindowedValue<?> output);
-
-  /**
-   * Hook for subclasses to implement that will be called whenever
-   * {@link WindowedContext#output(TupleTag, Object)} is called.
-   */
-  void noteOutput(TupleTag<?> tag, WindowedValue<?> output);
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bed1c53f/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index adbe62e..97b0b33 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -352,9 +352,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
 
     void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
       outputManager.output(mainOutputTag, windowedElem);
-      if (stepContext != null) {
-        stepContext.noteOutput(windowedElem);
-      }
     }
 
     private <T> void outputWindowedValue(
@@ -380,9 +377,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       }
 
       outputManager.output(tag, windowedElem);
-      if (stepContext != null) {
-        stepContext.noteOutput(tag, windowedElem);
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bed1c53f/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
index a414830..fd2575d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.core;
 
 import java.io.IOException;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn.WindowedContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -40,20 +39,6 @@ public interface StepContext {
   String getTransformName();
 
   /**
-   * Hook for subclasses to implement that will be called whenever
-   * {@link WindowedContext#output}
-   * is called.
-   */
-  void noteOutput(WindowedValue<?> output);
-
-  /**
-   * Hook for subclasses to implement that will be called whenever
-   * {@link WindowedContext#output}
-   * is called.
-   */
-  void noteOutput(TupleTag<?> tag, WindowedValue<?> output);
-
-  /**
    * Writes the given {@code PCollectionView} data to a globally accessible location.
    */
   <T, W extends BoundedWindow> void writePCollectionViewData(

http://git-wip-us.apache.org/repos/asf/beam/blob/bed1c53f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
index c394ebd..d999494 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
@@ -42,16 +42,6 @@ public class FlinkNoOpStepContext implements StepContext {
   }
 
   @Override
-  public void noteOutput(WindowedValue<?> output) {
-
-  }
-
-  @Override
-  public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {
-
-  }
-
-  @Override
   public <T, W extends BoundedWindow> void writePCollectionViewData(
       TupleTag<?> tag,
       Iterable<WindowedValue<T>> data,

http://git-wip-us.apache.org/repos/asf/beam/blob/bed1c53f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index c9f106a..2bb9c20 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -688,12 +688,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     }
 
     @Override
-    public void noteOutput(WindowedValue<?> output) {}
-
-    @Override
-    public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {}
-
-    @Override
     public <T, W extends BoundedWindow> void writePCollectionViewData(
         TupleTag<?> tag,
         Iterable<WindowedValue<T>> data,

http://git-wip-us.apache.org/repos/asf/beam/blob/bed1c53f/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 9147422..31e616c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -110,12 +110,6 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
     }
 
     @Override
-    public void noteOutput(WindowedValue<?> output) { }
-
-    @Override
-    public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) { }
-
-    @Override
     public <T, W extends BoundedWindow> void writePCollectionViewData(
         TupleTag<?> tag,
         Iterable<WindowedValue<T>> data,

http://git-wip-us.apache.org/repos/asf/beam/blob/bed1c53f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
index b206bc7..750c167 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
@@ -42,14 +42,6 @@ public class FakeStepContext implements StepContext {
   }
 
   @Override
-  public void noteOutput(WindowedValue<?> output) {
-  }
-
-  @Override
-  public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {
-  }
-
-  @Override
   public <T, W extends BoundedWindow> void writePCollectionViewData(
       TupleTag<?> tag,
       Iterable<WindowedValue<T>> data,


[05/16] beam git commit: Rename BaseExecutionContext.StepContext to BaseStepContext

Posted by ke...@apache.org.
Rename BaseExecutionContext.StepContext to BaseStepContext


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8b7a1f6d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8b7a1f6d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8b7a1f6d

Branch: refs/heads/master
Commit: 8b7a1f6dfe0ac33814a0b0c67f37f47ab449ec4b
Parents: bed1c53
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 22 15:34:37 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:16:27 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/BaseExecutionContext.java       | 14 +++++++-------
 .../beam/runners/core/SimpleDoFnRunnerTest.java       |  5 +++--
 .../beam/runners/core/StatefulDoFnRunnerTest.java     |  5 +++--
 .../beam/runners/direct/DirectExecutionContext.java   |  2 +-
 4 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8b7a1f6d/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
index a006999..5667250 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
@@ -31,11 +31,11 @@ import org.apache.beam.sdk.values.TupleTag;
  * Base class for implementations of {@link ExecutionContext}.
  *
  * <p>A concrete subclass should implement {@link #createStepContext} to create the appropriate
- * {@link StepContext} implementation. Any {@code StepContext} created will
+ * {@link BaseStepContext} implementation. Any {@code StepContext} created will
  * be cached for the lifetime of this {@link ExecutionContext}.
  *
  * <p>BaseExecutionContext is generic to allow implementing subclasses to return a concrete subclass
- * of {@link StepContext} from {@link #getOrCreateStepContext(String, String)} and
+ * of {@link BaseStepContext} from {@link #getOrCreateStepContext(String, String)} and
  * {@link #getAllStepContexts()} without forcing each subclass to override the method, e.g.
  * <pre>{@code
  * {@literal @}Override
@@ -56,12 +56,12 @@ public abstract class BaseExecutionContext<T extends StepContext>
 
   /**
    * Implementations should override this to create the specific type
-   * of {@link StepContext} they need.
+   * of {@link BaseStepContext} they need.
    */
   protected abstract T createStepContext(String stepName, String transformName);
 
   /**
-   * Returns the {@link StepContext} associated with the given step.
+   * Returns the {@link BaseStepContext} associated with the given step.
    */
   @Override
   public T getOrCreateStepContext(String stepName, String transformName) {
@@ -97,7 +97,7 @@ public abstract class BaseExecutionContext<T extends StepContext>
   }
 
   /**
-   * Returns a collection view of all of the {@link StepContext}s.
+   * Returns a collection view of all of the {@link BaseStepContext}s.
    */
   @Override
   public Collection<? extends T> getAllStepContexts() {
@@ -110,12 +110,12 @@ public abstract class BaseExecutionContext<T extends StepContext>
    * <p>To complete a concrete subclass, implement {@link #timerInternals} and
    * {@link #stateInternals}.
    */
-  public abstract static class StepContext implements org.apache.beam.runners.core.StepContext {
+  public abstract static class BaseStepContext implements org.apache.beam.runners.core.StepContext {
     private final ExecutionContext executionContext;
     private final String stepName;
     private final String transformName;
 
-    public StepContext(ExecutionContext executionContext, String stepName, String transformName) {
+    public BaseStepContext(ExecutionContext executionContext, String stepName, String transformName) {
       this.executionContext = executionContext;
       this.stepName = stepName;
       this.transformName = transformName;

http://git-wip-us.apache.org/repos/asf/beam/blob/8b7a1f6d/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index abefd1c..3750e6c 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -29,7 +29,7 @@ import com.google.common.collect.ListMultimap;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
+import org.apache.beam.runners.core.BaseExecutionContext.BaseStepContext;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.sdk.coders.Coder;
@@ -63,7 +63,8 @@ import org.mockito.MockitoAnnotations;
 public class SimpleDoFnRunnerTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
 
-  @Mock StepContext mockStepContext;
+  @Mock
+  BaseStepContext mockStepContext;
 
   @Mock TimerInternals mockTimerInternals;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8b7a1f6d/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index 5172f43..a335c3a 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -24,7 +24,7 @@ import static org.mockito.Mockito.when;
 
 import com.google.common.base.MoreObjects;
 import java.util.Collections;
-import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
+import org.apache.beam.runners.core.BaseExecutionContext.BaseStepContext;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -69,7 +69,8 @@ public class StatefulDoFnRunnerTest {
   private static final IntervalWindow WINDOW_2 =
       new IntervalWindow(new Instant(10), new Instant(20));
 
-  @Mock StepContext mockStepContext;
+  @Mock
+  BaseStepContext mockStepContext;
 
   private InMemoryStateInternals<String> stateInternals;
   private InMemoryTimerInternals timerInternals;

http://git-wip-us.apache.org/repos/asf/beam/blob/8b7a1f6d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index 107f39a..6d2d02a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -57,7 +57,7 @@ class DirectExecutionContext
    * Step Context for the {@link DirectRunner}.
    */
   public class DirectStepContext
-      extends BaseExecutionContext.StepContext {
+      extends BaseStepContext {
     private CopyOnAccessInMemoryStateInternals<?> stateInternals;
     private DirectTimerInternals timerInternals;
 


[07/16] beam git commit: Move BaseStepContext to the top level

Posted by ke...@apache.org.
Move BaseStepContext to the top level


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/59322d51
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/59322d51
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/59322d51

Branch: refs/heads/master
Commit: 59322d51e80e7480710a296f51a4cb65303f5e06
Parents: 8b7a1f6
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 22 15:35:46 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:16:27 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/BaseExecutionContext.java | 46 --------------
 .../beam/runners/core/BaseStepContext.java      | 66 ++++++++++++++++++++
 .../beam/runners/core/SimpleDoFnRunnerTest.java |  1 -
 .../runners/core/StatefulDoFnRunnerTest.java    |  1 -
 .../runners/direct/DirectExecutionContext.java  |  1 +
 5 files changed, 67 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/59322d51/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
index 5667250..877fa0a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
@@ -17,15 +17,10 @@
  */
 package org.apache.beam.runners.core;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Base class for implementations of {@link ExecutionContext}.
@@ -104,45 +99,4 @@ public abstract class BaseExecutionContext<T extends StepContext>
     return Collections.unmodifiableCollection(cachedStepContexts.values());
   }
 
-  /**
-   * Base class for implementations of {@link org.apache.beam.runners.core.StepContext}.
-   *
-   * <p>To complete a concrete subclass, implement {@link #timerInternals} and
-   * {@link #stateInternals}.
-   */
-  public abstract static class BaseStepContext implements org.apache.beam.runners.core.StepContext {
-    private final ExecutionContext executionContext;
-    private final String stepName;
-    private final String transformName;
-
-    public BaseStepContext(ExecutionContext executionContext, String stepName, String transformName) {
-      this.executionContext = executionContext;
-      this.stepName = stepName;
-      this.transformName = transformName;
-    }
-
-    @Override
-    public String getStepName() {
-      return stepName;
-    }
-
-    @Override
-    public String getTransformName() {
-      return transformName;
-    }
-
-    @Override
-    public <T, W extends BoundedWindow> void writePCollectionViewData(
-        TupleTag<?> tag,
-        Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>> dataCoder,
-        W window, Coder<W> windowCoder) throws IOException {
-      throw new UnsupportedOperationException("Not implemented.");
-    }
-
-    @Override
-    public abstract StateInternals stateInternals();
-
-    @Override
-    public abstract TimerInternals timerInternals();
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/59322d51/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java
new file mode 100644
index 0000000..f0436ac
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.IOException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Base class for implementations of {@link StepContext}.
+ *
+ * <p>To complete a concrete subclass, implement {@link #timerInternals} and
+ * {@link #stateInternals}.
+ */
+public abstract class BaseStepContext implements StepContext {
+  private final ExecutionContext executionContext;
+  private final String stepName;
+  private final String transformName;
+
+  public BaseStepContext(ExecutionContext executionContext, String stepName, String transformName) {
+    this.executionContext = executionContext;
+    this.stepName = stepName;
+    this.transformName = transformName;
+  }
+
+  @Override
+  public String getStepName() {
+    return stepName;
+  }
+
+  @Override
+  public String getTransformName() {
+    return transformName;
+  }
+
+  @Override
+  public <T, W extends BoundedWindow> void writePCollectionViewData(
+      TupleTag<?> tag,
+      Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>> dataCoder,
+      W window, Coder<W> windowCoder) throws IOException {
+    throw new UnsupportedOperationException("Not implemented.");
+  }
+
+  @Override
+  public abstract StateInternals stateInternals();
+
+  @Override
+  public abstract TimerInternals timerInternals();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/59322d51/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index 3750e6c..59e5857 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -29,7 +29,6 @@ import com.google.common.collect.ListMultimap;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import org.apache.beam.runners.core.BaseExecutionContext.BaseStepContext;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.sdk.coders.Coder;

http://git-wip-us.apache.org/repos/asf/beam/blob/59322d51/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index a335c3a..62a6578 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.when;
 
 import com.google.common.base.MoreObjects;
 import java.util.Collections;
-import org.apache.beam.runners.core.BaseExecutionContext.BaseStepContext;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;

http://git-wip-us.apache.org/repos/asf/beam/blob/59322d51/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index 6d2d02a..e5b88e5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.direct;
 
 import org.apache.beam.runners.core.BaseExecutionContext;
+import org.apache.beam.runners.core.BaseStepContext;
 import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;


[13/16] beam git commit: Delete unused BaseStepContext

Posted by ke...@apache.org.
Delete unused BaseStepContext


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62115b29
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62115b29
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62115b29

Branch: refs/heads/master
Commit: 62115b29a7f27a1a74b7c870d4277655adb3dfbf
Parents: dc58551
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 22 17:46:58 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:16:27 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/BaseStepContext.java      | 40 --------------------
 .../beam/runners/core/SimpleDoFnRunnerTest.java |  2 +-
 .../runners/core/StatefulDoFnRunnerTest.java    |  2 +-
 .../runners/direct/DirectExecutionContext.java  |  3 --
 4 files changed, 2 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/62115b29/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java
deleted file mode 100644
index 4abd4d2..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-/**
- * Base class for implementations of {@link StepContext}.
- *
- * <p>To complete a concrete subclass, implement {@link #timerInternals} and
- * {@link #stateInternals}.
- */
-public abstract class BaseStepContext implements StepContext {
-  private final String stepName;
-  private final String transformName;
-
-  public BaseStepContext(String stepName, String transformName) {
-    this.stepName = stepName;
-    this.transformName = transformName;
-  }
-
-  @Override
-  public abstract StateInternals stateInternals();
-
-  @Override
-  public abstract TimerInternals timerInternals();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/62115b29/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index 59e5857..f331b65 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -63,7 +63,7 @@ public class SimpleDoFnRunnerTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
 
   @Mock
-  BaseStepContext mockStepContext;
+  StepContext mockStepContext;
 
   @Mock TimerInternals mockTimerInternals;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/62115b29/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index 62a6578..4f155dc 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -69,7 +69,7 @@ public class StatefulDoFnRunnerTest {
       new IntervalWindow(new Instant(10), new Instant(20));
 
   @Mock
-  BaseStepContext mockStepContext;
+  StepContext mockStepContext;
 
   private InMemoryStateInternals<String> stateInternals;
   private InMemoryTimerInternals timerInternals;

http://git-wip-us.apache.org/repos/asf/beam/blob/62115b29/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index 8452565..cca5719 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -17,11 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
-import java.util.Collection;
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import org.apache.beam.runners.core.BaseStepContext;
 import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;