You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 18:42:13 UTC
[1/3] incubator-beam git commit: Move ExecutionContext and related
classes to runners-core
Repository: incubator-beam
Updated Branches:
refs/heads/master 4843dc59c -> a9447a225
Move ExecutionContext and related classes to runners-core
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9d2b8e09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9d2b8e09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9d2b8e09
Branch: refs/heads/master
Commit: 9d2b8e09bcb5e04017b487e1a919d335875dbfc0
Parents: 64336e4
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Dec 15 20:20:34 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Dec 21 10:10:00 2016 -0800
----------------------------------------------------------------------
.../operators/ApexParDoOperator.java | 2 +-
.../apex/translation/utils/NoOpStepContext.java | 3 +-
.../beam/runners/core/AggregatorFactory.java | 1 -
.../beam/runners/core/BaseExecutionContext.java | 176 +++++++++++++++++++
.../apache/beam/runners/core/DoFnRunners.java | 2 +-
.../beam/runners/core/ExecutionContext.java | 102 +++++++++++
.../beam/runners/core/SimpleDoFnRunner.java | 2 +-
.../beam/runners/core/SimpleOldDoFnRunner.java | 2 +-
.../beam/runners/core/SimpleDoFnRunnerTest.java | 2 +-
.../runners/core/SimpleOldDoFnRunnerTest.java | 3 +-
.../runners/direct/AggregatorContainer.java | 2 +-
.../runners/direct/DirectExecutionContext.java | 6 +-
.../beam/runners/direct/EvaluationContext.java | 2 +-
.../runners/direct/AggregatorContainerTest.java | 2 +-
.../wrappers/streaming/DoFnOperator.java | 2 +-
.../wrappers/streaming/WindowDoFnOperator.java | 2 +-
.../spark/aggregators/SparkAggregators.java | 2 +-
.../spark/translation/SparkProcessContext.java | 2 +-
.../beam/sdk/util/BaseExecutionContext.java | 174 ------------------
.../apache/beam/sdk/util/ExecutionContext.java | 100 -----------
20 files changed, 295 insertions(+), 294 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index a3d3a97..c41cd45 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -41,6 +41,7 @@ import org.apache.beam.runners.core.AggregatorFactory;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.sdk.coders.Coder;
@@ -50,7 +51,6 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnAdapters;
import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.util.ExecutionContext;
import org.apache.beam.sdk.util.NullSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.UserCodeException;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
index 078f95f..f169ae6 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
@@ -19,10 +19,9 @@ package org.apache.beam.runners.apex.translation.utils;
import java.io.IOException;
import java.io.Serializable;
-
+import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ExecutionContext;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.state.StateInternals;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
index 153d30d..24a605f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.ExecutionContext;
/**
* A factory for creating aggregators.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
new file mode 100644
index 0000000..7b674dc
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Base class for implementations of {@link ExecutionContext}.
+ *
+ * <p>A concrete subclass should implement {@link #createStepContext} to create the appropriate
+ * {@link StepContext} implementation. Any {@code StepContext} created will
+ * be cached for the lifetime of this {@link ExecutionContext}.
+ *
+ * <p>BaseExecutionContext is generic to allow implementing subclasses to return a concrete subclass
+ * of {@link StepContext} from {@link #getOrCreateStepContext(String, String)} and
+ * {@link #getAllStepContexts()} without forcing each subclass to override the method, e.g.
+ * <pre>{@code
+ * {@literal @}Override
+ * StreamingModeExecutionContext.StepContext getOrCreateStepContext(...) {
+ * return (StreamingModeExecutionContext.StepContext) super.getOrCreateStepContext(...);
+ * }
+ * }</pre>
+ *
+ * <p>When a subclass of {@code BaseExecutionContext} has been downcast, the return types of
+ * {@link #createStepContext(String, String)},
+ * {@link #getOrCreateStepContext(String, String)}, and {@link #getAllStepContexts()}
+ * will be appropriately specialized.
+ */
+public abstract class BaseExecutionContext<T extends ExecutionContext.StepContext>
+ implements ExecutionContext {
+
+ private Map<String, T> cachedStepContexts = new HashMap<>();
+
+ /**
+ * Implementations should override this to create the specific type
+ * of {@link StepContext} they need.
+ */
+ protected abstract T createStepContext(String stepName, String transformName);
+
+ /**
+ * Returns the {@link StepContext} associated with the given step.
+ */
+ @Override
+ public T getOrCreateStepContext(String stepName, String transformName) {
+ final String finalStepName = stepName;
+ final String finalTransformName = transformName;
+ return getOrCreateStepContext(
+ stepName,
+ new CreateStepContextFunction<T>() {
+ @Override
+ public T create() {
+ return createStepContext(finalStepName, finalTransformName);
+ }
+ });
+ }
+
+ /**
+ * Factory method interface to create an execution context if none exists during
+ * {@link #getOrCreateStepContext(String, CreateStepContextFunction)}.
+ */
+ protected interface CreateStepContextFunction<T extends ExecutionContext.StepContext> {
+ T create();
+ }
+
+ protected final T getOrCreateStepContext(String stepName,
+ CreateStepContextFunction<T> createContextFunc) {
+ T context = cachedStepContexts.get(stepName);
+ if (context == null) {
+ context = createContextFunc.create();
+ cachedStepContexts.put(stepName, context);
+ }
+
+ return context;
+ }
+
+ /**
+ * Returns a collection view of all of the {@link StepContext}s.
+ */
+ @Override
+ public Collection<? extends T> getAllStepContexts() {
+ return Collections.unmodifiableCollection(cachedStepContexts.values());
+ }
+
+ /**
+ * Hook for subclasses to implement that will be called whenever
+ * {@code DoFn.Context#output}
+ * is called.
+ */
+ @Override
+ public void noteOutput(WindowedValue<?> output) {}
+
+ /**
+ * Hook for subclasses to implement that will be called whenever
+ * {@code DoFn.Context#sideOutput}
+ * is called.
+ */
+ @Override
+ public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {}
+
+ /**
+ * Base class for implementations of {@link ExecutionContext.StepContext}.
+ *
+ * <p>To complete a concrete subclass, implement {@link #timerInternals} and
+ * {@link #stateInternals}.
+ */
+ public abstract static class StepContext implements ExecutionContext.StepContext {
+ private final ExecutionContext executionContext;
+ private final String stepName;
+ private final String transformName;
+
+ public StepContext(ExecutionContext executionContext, String stepName, String transformName) {
+ this.executionContext = executionContext;
+ this.stepName = stepName;
+ this.transformName = transformName;
+ }
+
+ @Override
+ public String getStepName() {
+ return stepName;
+ }
+
+ @Override
+ public String getTransformName() {
+ return transformName;
+ }
+
+ @Override
+ public void noteOutput(WindowedValue<?> output) {
+ executionContext.noteOutput(output);
+ }
+
+ @Override
+ public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
+ executionContext.noteSideOutput(tag, output);
+ }
+
+ @Override
+ public <T, W extends BoundedWindow> void writePCollectionViewData(
+ TupleTag<?> tag,
+ Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>> dataCoder,
+ W window, Coder<W> windowCoder) throws IOException {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public abstract StateInternals<?> stateInternals();
+
+ @Override
+ public abstract TimerInternals timerInternals();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 0e4bf75..820bfcd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -19,13 +19,13 @@ package org.apache.beam.runners.core;
import java.util.List;
import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
+import org.apache.beam.runners.core.ExecutionContext.StepContext;
import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ExecutionContext.StepContext;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
new file mode 100644
index 0000000..f67aff4
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Context for the current execution. This is guaranteed to exist during processing,
+ * but does not necessarily persist between different batches of work.
+ */
+public interface ExecutionContext {
+ /**
+ * Returns the {@link StepContext} associated with the given step.
+ */
+ StepContext getOrCreateStepContext(String stepName, String transformName);
+
+ /**
+ * Returns a collection view of all of the {@link StepContext}s.
+ */
+ Collection<? extends StepContext> getAllStepContexts();
+
+ /**
+ * Hook for subclasses to implement that will be called whenever
+ * {@link org.apache.beam.sdk.transforms.DoFn.Context#output}
+ * is called.
+ */
+ void noteOutput(WindowedValue<?> output);
+
+ /**
+ * Hook for subclasses to implement that will be called whenever
+ * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput}
+ * is called.
+ */
+ void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);
+
+ /**
+ * Per-step, per-key context used for retrieving state.
+ */
+ public interface StepContext {
+
+ /**
+ * The name of the step.
+ */
+ String getStepName();
+
+ /**
+ * The name of the transform for the step.
+ */
+ String getTransformName();
+
+ /**
+ * Hook for subclasses to implement that will be called whenever
+ * {@link org.apache.beam.sdk.transforms.DoFn.Context#output}
+ * is called.
+ */
+ void noteOutput(WindowedValue<?> output);
+
+ /**
+ * Hook for subclasses to implement that will be called whenever
+ * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput}
+ * is called.
+ */
+ void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);
+
+ /**
+ * Writes the given {@code PCollectionView} data to a globally accessible location.
+ */
+ <T, W extends BoundedWindow> void writePCollectionViewData(
+ TupleTag<?> tag,
+ Iterable<WindowedValue<T>> data,
+ Coder<Iterable<WindowedValue<T>>> dataCoder,
+ W window,
+ Coder<W> windowCoder)
+ throws IOException;
+
+ StateInternals<?> stateInternals();
+
+ TimerInternals timerInternals();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index a7d82bf..b42c57d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.ExecutionContext.StepContext;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
@@ -47,7 +48,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.ExecutionContext.StepContext;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.TimeDomain;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 342a4a8..1ff0212 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.ExecutionContext.StepContext;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
@@ -36,7 +37,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.ExecutionContext.StepContext;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.TimeDomain;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index ec5d375..8ae09cb 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -26,13 +26,13 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.BaseExecutionContext.StepContext;
import org.apache.beam.sdk.util.NullSideInputReader;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.Timer;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
index 0e23dcb..4610069 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
@@ -22,9 +22,8 @@ import static org.mockito.Mockito.mock;
import java.util.Arrays;
import java.util.List;
-
+import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.util.BaseExecutionContext.StepContext;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
index c7fa4df..fd17704 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
@@ -28,9 +28,9 @@ import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.core.AggregatorFactory;
+import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.util.ExecutionContext;
/**
* AccumT container for the current values associated with {@link Aggregator Aggregators}.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index c6051f0..8250cf1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -17,11 +17,11 @@
*/
package org.apache.beam.runners.direct;
+import org.apache.beam.runners.core.BaseExecutionContext;
+import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
-import org.apache.beam.sdk.util.BaseExecutionContext;
-import org.apache.beam.sdk.util.ExecutionContext;
import org.apache.beam.sdk.util.TimerInternals;
/**
@@ -54,7 +54,7 @@ class DirectExecutionContext
* Step Context for the {@link DirectRunner}.
*/
public class DirectStepContext
- extends org.apache.beam.sdk.util.BaseExecutionContext.StepContext {
+ extends BaseExecutionContext.StepContext {
private CopyOnAccessInMemoryStateInternals<Object> stateInternals;
private DirectTimerInternals timerInternals;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index bbcab8e..3b9367a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -31,6 +31,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
+import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.direct.CommittedResult.OutputType;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
@@ -43,7 +44,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.util.ExecutionContext;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
index c8310c9..f770800 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
@@ -24,9 +24,9 @@ import static org.mockito.Mockito.when;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.core.ExecutionContext.StepContext;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Sum.SumIntegerFn;
-import org.apache.beam.sdk.util.ExecutionContext.StepContext;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 8704308..057a3e7 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -31,6 +31,7 @@ import java.util.Map;
import org.apache.beam.runners.core.AggregatorFactory;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
@@ -46,7 +47,6 @@ import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.ExecutionContext;
import org.apache.beam.sdk.util.NullSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.TimerInternals;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 9cea529..9855d46 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -38,6 +38,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
+import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
@@ -48,7 +49,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ExecutionContext;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
index 17d5844..fa5c8d1 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
@@ -22,11 +22,11 @@ import com.google.common.collect.ImmutableList;
import java.util.Collection;
import java.util.Map;
import org.apache.beam.runners.core.AggregatorFactory;
+import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.util.ExecutionContext;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaSparkContext;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 3a31cae..9957bf3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -24,11 +24,11 @@ import java.io.IOException;
import java.util.Iterator;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.ExecutionContext.StepContext;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ExecutionContext.StepContext;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.state.StateInternals;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
deleted file mode 100644
index e26f2b0..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Base class for implementations of {@link ExecutionContext}.
- *
- * <p>A concrete subclass should implement {@link #createStepContext} to create the appropriate
- * {@link StepContext} implementation. Any {@code StepContext} created will
- * be cached for the lifetime of this {@link ExecutionContext}.
- *
- * <p>BaseExecutionContext is generic to allow implementing subclasses to return a concrete subclass
- * of {@link StepContext} from {@link #getOrCreateStepContext(String, String)} and
- * {@link #getAllStepContexts()} without forcing each subclass to override the method, e.g.
- * <pre>{@code
- * {@literal @}Override
- * StreamingModeExecutionContext.StepContext getOrCreateStepContext(...) {
- * return (StreamingModeExecutionContext.StepContext) super.getOrCreateStepContext(...);
- * }
- * }</pre>
- *
- * <p>When a subclass of {@code BaseExecutionContext} has been downcast, the return types of
- * {@link #createStepContext(String, String)},
- * {@link #getOrCreateStepContext(String, String)}, and {@link #getAllStepContexts()}
- * will be appropriately specialized.
- */
-public abstract class BaseExecutionContext<T extends ExecutionContext.StepContext>
- implements ExecutionContext {
-
- private Map<String, T> cachedStepContexts = new HashMap<>();
-
- /**
- * Implementations should override this to create the specific type
- * of {@link StepContext} they need.
- */
- protected abstract T createStepContext(String stepName, String transformName);
-
- /**
- * Returns the {@link StepContext} associated with the given step.
- */
- @Override
- public T getOrCreateStepContext(String stepName, String transformName) {
- final String finalStepName = stepName;
- final String finalTransformName = transformName;
- return getOrCreateStepContext(
- stepName,
- new CreateStepContextFunction<T>() {
- @Override
- public T create() {
- return createStepContext(finalStepName, finalTransformName);
- }
- });
- }
-
- /**
- * Factory method interface to create an execution context if none exists during
- * {@link #getOrCreateStepContext(String, CreateStepContextFunction)}.
- */
- protected interface CreateStepContextFunction<T extends ExecutionContext.StepContext> {
- T create();
- }
-
- protected final T getOrCreateStepContext(String stepName,
- CreateStepContextFunction<T> createContextFunc) {
- T context = cachedStepContexts.get(stepName);
- if (context == null) {
- context = createContextFunc.create();
- cachedStepContexts.put(stepName, context);
- }
-
- return context;
- }
-
- /**
- * Returns a collection view of all of the {@link StepContext}s.
- */
- @Override
- public Collection<? extends T> getAllStepContexts() {
- return Collections.unmodifiableCollection(cachedStepContexts.values());
- }
-
- /**
- * Hook for subclasses to implement that will be called whenever
- * {@code DoFn.Context#output}
- * is called.
- */
- @Override
- public void noteOutput(WindowedValue<?> output) {}
-
- /**
- * Hook for subclasses to implement that will be called whenever
- * {@code DoFn.Context#sideOutput}
- * is called.
- */
- @Override
- public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {}
-
- /**
- * Base class for implementations of {@link ExecutionContext.StepContext}.
- *
- * <p>To complete a concrete subclass, implement {@link #timerInternals} and
- * {@link #stateInternals}.
- */
- public abstract static class StepContext implements ExecutionContext.StepContext {
- private final ExecutionContext executionContext;
- private final String stepName;
- private final String transformName;
-
- public StepContext(ExecutionContext executionContext, String stepName, String transformName) {
- this.executionContext = executionContext;
- this.stepName = stepName;
- this.transformName = transformName;
- }
-
- @Override
- public String getStepName() {
- return stepName;
- }
-
- @Override
- public String getTransformName() {
- return transformName;
- }
-
- @Override
- public void noteOutput(WindowedValue<?> output) {
- executionContext.noteOutput(output);
- }
-
- @Override
- public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
- executionContext.noteSideOutput(tag, output);
- }
-
- @Override
- public <T, W extends BoundedWindow> void writePCollectionViewData(
- TupleTag<?> tag,
- Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>> dataCoder,
- W window, Coder<W> windowCoder) throws IOException {
- throw new UnsupportedOperationException("Not implemented.");
- }
-
- @Override
- public abstract StateInternals<?> stateInternals();
-
- @Override
- public abstract TimerInternals timerInternals();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
deleted file mode 100644
index 4429d76..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.io.IOException;
-import java.util.Collection;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Context for the current execution. This is guaranteed to exist during processing,
- * but does not necessarily persist between different batches of work.
- */
-public interface ExecutionContext {
- /**
- * Returns the {@link StepContext} associated with the given step.
- */
- StepContext getOrCreateStepContext(String stepName, String transformName);
-
- /**
- * Returns a collection view of all of the {@link StepContext}s.
- */
- Collection<? extends StepContext> getAllStepContexts();
-
- /**
- * Hook for subclasses to implement that will be called whenever
- * {@link org.apache.beam.sdk.transforms.DoFn.Context#output}
- * is called.
- */
- void noteOutput(WindowedValue<?> output);
-
- /**
- * Hook for subclasses to implement that will be called whenever
- * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput}
- * is called.
- */
- void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);
-
- /**
- * Per-step, per-key context used for retrieving state.
- */
- public interface StepContext {
-
- /**
- * The name of the step.
- */
- String getStepName();
-
- /**
- * The name of the transform for the step.
- */
- String getTransformName();
-
- /**
- * Hook for subclasses to implement that will be called whenever
- * {@link org.apache.beam.sdk.transforms.DoFn.Context#output}
- * is called.
- */
- void noteOutput(WindowedValue<?> output);
-
- /**
- * Hook for subclasses to implement that will be called whenever
- * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput}
- * is called.
- */
- void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);
-
- /**
- * Writes the given {@code PCollectionView} data to a globally accessible location.
- */
- <T, W extends BoundedWindow> void writePCollectionViewData(
- TupleTag<?> tag,
- Iterable<WindowedValue<T>> data,
- Coder<Iterable<WindowedValue<T>>> dataCoder,
- W window,
- Coder<W> windowCoder)
- throws IOException;
-
- StateInternals<?> stateInternals();
-
- TimerInternals timerInternals();
- }
-}
[2/3] incubator-beam git commit: Update Dataflow worker to
beam-master-20161221
Posted by ke...@apache.org.
Update Dataflow worker to beam-master-20161221
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/64336e40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/64336e40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/64336e40
Branch: refs/heads/master
Commit: 64336e40dd6a48b3b6b48634bb9204db0aa0c7ca
Parents: 0d0a5e2
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 21 10:09:49 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Dec 21 10:10:00 2016 -0800
----------------------------------------------------------------------
.../org/apache/beam/runners/dataflow/dataflow.properties | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64336e40/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
index bf08e83..2912f61 100644
--- a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
+++ b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
@@ -18,6 +18,6 @@
environment.major.version=6
-worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161220
+worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161221
-worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161220
+worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161221
[3/3] incubator-beam git commit: This closes #1666: Move
ExecutionContext and related classes to runners-core
Posted by ke...@apache.org.
This closes #1666: Move ExecutionContext and related classes to runners-core
Move ExecutionContext and related classes to runners-core
Update Dataflow worker to beam-master-20161221
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a9447a22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a9447a22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a9447a22
Branch: refs/heads/master
Commit: a9447a2251f46496b7a773c8b07b3281dbc7a6fb
Parents: 4843dc5 9d2b8e0
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 21 10:24:11 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Dec 21 10:24:11 2016 -0800
----------------------------------------------------------------------
.../operators/ApexParDoOperator.java | 2 +-
.../apex/translation/utils/NoOpStepContext.java | 3 +-
.../beam/runners/core/AggregatorFactory.java | 1 -
.../beam/runners/core/BaseExecutionContext.java | 176 +++++++++++++++++++
.../apache/beam/runners/core/DoFnRunners.java | 2 +-
.../beam/runners/core/ExecutionContext.java | 102 +++++++++++
.../beam/runners/core/SimpleDoFnRunner.java | 2 +-
.../beam/runners/core/SimpleOldDoFnRunner.java | 2 +-
.../beam/runners/core/SimpleDoFnRunnerTest.java | 2 +-
.../runners/core/SimpleOldDoFnRunnerTest.java | 3 +-
.../runners/direct/AggregatorContainer.java | 2 +-
.../runners/direct/DirectExecutionContext.java | 6 +-
.../beam/runners/direct/EvaluationContext.java | 2 +-
.../runners/direct/AggregatorContainerTest.java | 2 +-
.../wrappers/streaming/DoFnOperator.java | 2 +-
.../wrappers/streaming/WindowDoFnOperator.java | 2 +-
.../beam/runners/dataflow/dataflow.properties | 4 +-
.../spark/aggregators/SparkAggregators.java | 2 +-
.../spark/translation/SparkProcessContext.java | 2 +-
.../beam/sdk/util/BaseExecutionContext.java | 174 ------------------
.../apache/beam/sdk/util/ExecutionContext.java | 100 -----------
21 files changed, 297 insertions(+), 296 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a9447a22/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------