You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/23 18:18:10 UTC

[11/16] beam git commit: Inline and delete BaseExecutionContext

Inline and delete BaseExecutionContext


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0be3cf34
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0be3cf34
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0be3cf34

Branch: refs/heads/master
Commit: 0be3cf3462c19f0b007b2329c95ea4865d22cad5
Parents: 32c6cb1
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 22 16:50:41 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:16:27 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/BaseExecutionContext.java | 102 -------------------
 .../runners/direct/DirectExecutionContext.java  |  39 +++++--
 2 files changed, 32 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0be3cf34/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
deleted file mode 100644
index 877fa0a..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-/**
- * Base class for implementations of {@link ExecutionContext}.
- *
- * <p>A concrete subclass should implement {@link #createStepContext} to create the appropriate
- * {@link BaseStepContext} implementation. Any {@code StepContext} created will
- * be cached for the lifetime of this {@link ExecutionContext}.
- *
- * <p>BaseExecutionContext is generic to allow implementing subclasses to return a concrete subclass
- * of {@link BaseStepContext} from {@link #getOrCreateStepContext(String, String)} and
- * {@link #getAllStepContexts()} without forcing each subclass to override the method, e.g.
- * <pre>{@code
- * {@literal @}Override
- * StreamingModeExecutionContext.StepContext getOrCreateStepContext(...) {
- *   return (StreamingModeExecutionContext.StepContext) super.getOrCreateStepContext(...);
- * }
- * }</pre>
- *
- * <p>When a subclass of {@code BaseExecutionContext} has been downcast, the return types of
- * {@link #createStepContext(String, String)},
- * {@link #getOrCreateStepContext(String, String)}, and {@link #getAllStepContexts()}
- * will be appropriately specialized.
- */
-public abstract class BaseExecutionContext<T extends StepContext>
-    implements ExecutionContext {
-
-  private Map<String, T> cachedStepContexts = new LinkedHashMap<>();
-
-  /**
-   * Implementations should override this to create the specific type
-   * of {@link BaseStepContext} they need.
-   */
-  protected abstract T createStepContext(String stepName, String transformName);
-
-  /**
-   * Returns the {@link BaseStepContext} associated with the given step.
-   */
-  @Override
-  public T getOrCreateStepContext(String stepName, String transformName) {
-    final String finalStepName = stepName;
-    final String finalTransformName = transformName;
-    return getOrCreateStepContext(
-        stepName,
-        new CreateStepContextFunction<T>() {
-          @Override
-          public T create() {
-            return createStepContext(finalStepName, finalTransformName);
-          }
-        });
-  }
-
-  /**
-   * Factory method interface to create an execution context if none exists during
-   * {@link #getOrCreateStepContext(String, CreateStepContextFunction)}.
-   */
-  protected interface CreateStepContextFunction<T extends org.apache.beam.runners.core.StepContext> {
-    T create();
-  }
-
-  protected final T getOrCreateStepContext(String stepName,
-      CreateStepContextFunction<T> createContextFunc) {
-    T context = cachedStepContexts.get(stepName);
-    if (context == null) {
-      context = createContextFunc.create();
-      cachedStepContexts.put(stepName, context);
-    }
-
-    return context;
-  }
-
-  /**
-   * Returns a collection view of all of the {@link BaseStepContext}s.
-   */
-  @Override
-  public Collection<? extends T> getAllStepContexts() {
-    return Collections.unmodifiableCollection(cachedStepContexts.values());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0be3cf34/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index 39174d6..9b68662 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -17,11 +17,14 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.core.BaseExecutionContext;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.beam.runners.core.BaseStepContext;
 import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
 
@@ -31,12 +34,12 @@ import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
  * <p>This implementation is not thread safe. A new {@link DirectExecutionContext} must be created
  * for each thread that requires it.
  */
-class DirectExecutionContext
-    extends BaseExecutionContext<DirectStepContext> {
+class DirectExecutionContext implements ExecutionContext {
   private final Clock clock;
   private final StructuralKey<?> key;
   private final CopyOnAccessInMemoryStateInternals existingState;
   private final TransformWatermarks watermarks;
+  private Map<String, DirectStepContext> cachedStepContexts = new LinkedHashMap<>();
 
   public DirectExecutionContext(
       Clock clock,
@@ -49,9 +52,31 @@ class DirectExecutionContext
     this.watermarks = watermarks;
   }
 
+  private DirectStepContext createStepContext(String stepName, String transformName) {
+    return new DirectStepContext(stepName, transformName);
+  }
+
+  /**
+   * Returns the {@link BaseStepContext} associated with the given step.
+   */
+  @Override
+  public DirectStepContext getOrCreateStepContext(String stepName, String transformName) {
+    final String finalStepName = stepName;
+    final String finalTransformName = transformName;
+    DirectStepContext context = cachedStepContexts.get(stepName);
+    if (context == null) {
+      context = createStepContext(finalStepName, finalTransformName);
+      cachedStepContexts.put(stepName, context);
+    }
+    return context;
+  }
+
+  /**
+   * Returns a collection view of all of the {@link BaseStepContext}s.
+   */
   @Override
-  protected DirectStepContext createStepContext(String stepName, String transformName) {
-    return new DirectStepContext(this, stepName, transformName);
+  public Collection<? extends DirectStepContext> getAllStepContexts() {
+    return Collections.unmodifiableCollection(cachedStepContexts.values());
   }
 
   /**
@@ -64,7 +89,7 @@ class DirectExecutionContext
     private final String transformName;
 
     public DirectStepContext(
-        ExecutionContext executionContext, String stepName, String transformName) {
+        String stepName, String transformName) {
       this.stepName = stepName;
       this.transformName = transformName;
     }