You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/23 18:18:10 UTC
[11/16] beam git commit: Inline and delete BaseExecutionContext
Inline and delete BaseExecutionContext
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0be3cf34
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0be3cf34
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0be3cf34
Branch: refs/heads/master
Commit: 0be3cf3462c19f0b007b2329c95ea4865d22cad5
Parents: 32c6cb1
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 22 16:50:41 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 11:16:27 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/BaseExecutionContext.java | 102 -------------------
.../runners/direct/DirectExecutionContext.java | 39 +++++--
2 files changed, 32 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0be3cf34/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
deleted file mode 100644
index 877fa0a..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-/**
- * Base class for implementations of {@link ExecutionContext}.
- *
- * <p>A concrete subclass should implement {@link #createStepContext} to create the appropriate
- * {@link BaseStepContext} implementation. Any {@code StepContext} created will
- * be cached for the lifetime of this {@link ExecutionContext}.
- *
- * <p>BaseExecutionContext is generic to allow implementing subclasses to return a concrete subclass
- * of {@link BaseStepContext} from {@link #getOrCreateStepContext(String, String)} and
- * {@link #getAllStepContexts()} without forcing each subclass to override the method, e.g.
- * <pre>{@code
- * {@literal @}Override
- * StreamingModeExecutionContext.StepContext getOrCreateStepContext(...) {
- * return (StreamingModeExecutionContext.StepContext) super.getOrCreateStepContext(...);
- * }
- * }</pre>
- *
- * <p>When a subclass of {@code BaseExecutionContext} has been downcast, the return types of
- * {@link #createStepContext(String, String)},
- * {@link #getOrCreateStepContext(String, String)}, and {@link #getAllStepContexts()}
- * will be appropriately specialized.
- */
-public abstract class BaseExecutionContext<T extends StepContext>
- implements ExecutionContext {
-
- private Map<String, T> cachedStepContexts = new LinkedHashMap<>();
-
- /**
- * Implementations should override this to create the specific type
- * of {@link BaseStepContext} they need.
- */
- protected abstract T createStepContext(String stepName, String transformName);
-
- /**
- * Returns the {@link BaseStepContext} associated with the given step.
- */
- @Override
- public T getOrCreateStepContext(String stepName, String transformName) {
- final String finalStepName = stepName;
- final String finalTransformName = transformName;
- return getOrCreateStepContext(
- stepName,
- new CreateStepContextFunction<T>() {
- @Override
- public T create() {
- return createStepContext(finalStepName, finalTransformName);
- }
- });
- }
-
- /**
- * Factory method interface to create an execution context if none exists during
- * {@link #getOrCreateStepContext(String, CreateStepContextFunction)}.
- */
- protected interface CreateStepContextFunction<T extends org.apache.beam.runners.core.StepContext> {
- T create();
- }
-
- protected final T getOrCreateStepContext(String stepName,
- CreateStepContextFunction<T> createContextFunc) {
- T context = cachedStepContexts.get(stepName);
- if (context == null) {
- context = createContextFunc.create();
- cachedStepContexts.put(stepName, context);
- }
-
- return context;
- }
-
- /**
- * Returns a collection view of all of the {@link BaseStepContext}s.
- */
- @Override
- public Collection<? extends T> getAllStepContexts() {
- return Collections.unmodifiableCollection(cachedStepContexts.values());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/0be3cf34/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index 39174d6..9b68662 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -17,11 +17,14 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.core.BaseExecutionContext;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.beam.runners.core.BaseStepContext;
import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
@@ -31,12 +34,12 @@ import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
* <p>This implementation is not thread safe. A new {@link DirectExecutionContext} must be created
* for each thread that requires it.
*/
-class DirectExecutionContext
- extends BaseExecutionContext<DirectStepContext> {
+class DirectExecutionContext implements ExecutionContext {
private final Clock clock;
private final StructuralKey<?> key;
private final CopyOnAccessInMemoryStateInternals existingState;
private final TransformWatermarks watermarks;
+ private Map<String, DirectStepContext> cachedStepContexts = new LinkedHashMap<>();
public DirectExecutionContext(
Clock clock,
@@ -49,9 +52,31 @@ class DirectExecutionContext
this.watermarks = watermarks;
}
+ private DirectStepContext createStepContext(String stepName, String transformName) {
+ return new DirectStepContext(stepName, transformName);
+ }
+
+ /**
+ * Returns the {@link BaseStepContext} associated with the given step.
+ */
+ @Override
+ public DirectStepContext getOrCreateStepContext(String stepName, String transformName) {
+ final String finalStepName = stepName;
+ final String finalTransformName = transformName;
+ DirectStepContext context = cachedStepContexts.get(stepName);
+ if (context == null) {
+ context = createStepContext(finalStepName, finalTransformName);
+ cachedStepContexts.put(stepName, context);
+ }
+ return context;
+ }
+
+ /**
+ * Returns a collection view of all of the {@link BaseStepContext}s.
+ */
@Override
- protected DirectStepContext createStepContext(String stepName, String transformName) {
- return new DirectStepContext(this, stepName, transformName);
+ public Collection<? extends DirectStepContext> getAllStepContexts() {
+ return Collections.unmodifiableCollection(cachedStepContexts.values());
}
/**
@@ -64,7 +89,7 @@ class DirectExecutionContext
private final String transformName;
public DirectStepContext(
- ExecutionContext executionContext, String stepName, String transformName) {
+ String stepName, String transformName) {
this.stepName = stepName;
this.transformName = transformName;
}