You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/13 00:40:39 UTC
[08/50] [abbrv] incubator-beam git commit: Address comments of Flink
Side-Input PR
Address comments of Flink Side-Input PR
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6ae4b6a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6ae4b6a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6ae4b6a3
Branch: refs/heads/gearpump-runner
Commit: 6ae4b6a3df5cf3b834505fcb3f21df0e90473a0f
Parents: 8007bdf
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Aug 25 11:00:39 2016 +0200
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Sep 12 17:40:11 2016 -0700
----------------------------------------------------------------------
.../beam/runners/core/SideInputHandler.java | 6 +-
.../apache/beam/runners/flink/FlinkRunner.java | 86 ++++++++++++++++++--
.../wrappers/streaming/DoFnOperator.java | 13 ++-
.../wrappers/streaming/WindowDoFnOperator.java | 2 -
4 files changed, 89 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ae4b6a3/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index a97d3f3..851ed37 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -60,7 +60,11 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
/** The list of side inputs that we're handling. */
protected final Collection<PCollectionView<?>> sideInputs;
- /** State internals that are scoped not to the key of a value but instead to one key group. */
+ /**
+ * State internals that are scoped not to the key of a value but are global. The state can still
+ * be keep locally but if side inputs are broadcast to all parallel operators then all will
+ * have the same view of the state.
+ */
private final StateInternals<Void> stateInternals;
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ae4b6a3/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 8b1f42e..d3c65c0 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -25,8 +25,13 @@ import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
@@ -35,6 +40,7 @@ import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
@@ -47,6 +53,8 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+
import org.apache.flink.api.common.JobExecutionResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -108,6 +116,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
private FlinkRunner(FlinkPipelineOptions options) {
this.options = options;
+ this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();
ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.<Class<?>, Class<?>>builder();
if (options.isStreaming()) {
@@ -124,6 +133,8 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
@Override
public FlinkRunnerResult run(Pipeline pipeline) {
+ logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
+
LOG.info("Executing pipeline using FlinkRunner.");
FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options);
@@ -176,6 +187,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
PTransform<InputT, OutputT> customTransform =
InstanceBuilder.ofType(customTransformClass)
+ .withArg(FlinkRunner.class, this)
.withArg(transformClass, transform)
.build();
@@ -223,6 +235,59 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
return files;
}
+ /** A set of {@link View}s with non-deterministic key coders. */
+ Set<PTransform<?, ?>> ptransformViewsWithNonDeterministicKeyCoders;
+
+ /**
+ * Records that the {@link PTransform} requires a deterministic key coder.
+ */
+ private void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) {
+ ptransformViewsWithNonDeterministicKeyCoders.add(ptransform);
+ }
+
+ /** Outputs a warning about PCollection views without deterministic key coders. */
+ private void logWarningIfPCollectionViewHasNonDeterministicKeyCoder(Pipeline pipeline) {
+ // We need to wait till this point to determine the names of the transforms since only
+ // at this time do we know the hierarchy of the transforms otherwise we could
+ // have just recorded the full names during apply time.
+ if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) {
+ final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet<>();
+ pipeline.traverseTopologically(new Pipeline.PipelineVisitor() {
+ @Override
+ public void visitValue(PValue value, TransformTreeNode producer) {
+ }
+
+ @Override
+ public void visitPrimitiveTransform(TransformTreeNode node) {
+ if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
+ ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
+ }
+ }
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+ if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
+ ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
+ }
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void leaveCompositeTransform(TransformTreeNode node) {
+ }
+ });
+
+ LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} "
+ + "because the key coder is not deterministic. Falling back to singleton implementation "
+ + "which may cause memory and/or performance problems. Future major versions of "
+ + "the Flink runner will require deterministic key coders.",
+ ptransformViewNamesWithNonDeterministicKeyCoders);
+ }
+ }
+
+
+ /////////////////////////////////////////////////////////////////////////////
+
/**
* Specialized implementation for
* {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
@@ -231,8 +296,11 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
private static class StreamingViewAsMap<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+ private final FlinkRunner runner;
+
@SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
- public StreamingViewAsMap(View.AsMap<K, V> transform) {
+ public StreamingViewAsMap(FlinkRunner runner, View.AsMap<K, V> transform) {
+ this.runner = runner;
}
@Override
@@ -248,7 +316,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
try {
inputCoder.getKeyCoder().verifyDeterministic();
} catch (Coder.NonDeterministicException e) {
-// runner.recordViewUsesNonDeterministicKeyCoder(this);
+ runner.recordViewUsesNonDeterministicKeyCoder(this);
}
return input
@@ -270,11 +338,14 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
private static class StreamingViewAsMultimap<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
+ private final FlinkRunner runner;
+
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
- public StreamingViewAsMultimap(View.AsMultimap<K, V> transform) {
+ public StreamingViewAsMultimap(FlinkRunner runner, View.AsMultimap<K, V> transform) {
+ this.runner = runner;
}
@Override
@@ -290,7 +361,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
try {
inputCoder.getKeyCoder().verifyDeterministic();
} catch (Coder.NonDeterministicException e) {
-// runner.recordViewUsesNonDeterministicKeyCoder(this);
+ runner.recordViewUsesNonDeterministicKeyCoder(this);
}
return input
@@ -315,7 +386,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
- public StreamingViewAsList(View.AsList<T> transform) {}
+ public StreamingViewAsList(FlinkRunner runner, View.AsList<T> transform) {}
@Override
public PCollectionView<List<T>> apply(PCollection<T> input) {
@@ -346,7 +417,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
- public StreamingViewAsIterable(View.AsIterable<T> transform) { }
+ public StreamingViewAsIterable(FlinkRunner runner, View.AsIterable<T> transform) { }
@Override
public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
@@ -386,7 +457,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
- public StreamingViewAsSingleton(View.AsSingleton<T> transform) {
+ public StreamingViewAsSingleton(FlinkRunner runner, View.AsSingleton<T> transform) {
this.transform = transform;
}
@@ -443,6 +514,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
*/
@SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
public StreamingCombineGloballyAsSingletonView(
+ FlinkRunner runner,
Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
this.transform = transform;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ae4b6a3/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 000d69f..2c7ebc6 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -75,11 +75,12 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
/**
- * Flink operator for executing {@link DoFn DoFns}.
+ * Flink operator for executing {@link OldDoFn DoFns}.
*
- * @param <InputT>
- * @param <FnOutputT>
- * @param <OutputT>
+ * @param <InputT> the input type of the {@link OldDoFn}
+ * @param <FnOutputT> the output type of the {@link OldDoFn}
+ * @param <OutputT> the output type of the operator, this can be different from the fn output type when we have
+ * side outputs
*/
public class DoFnOperator<InputT, FnOutputT, OutputT>
extends AbstractStreamOperator<OutputT>
@@ -95,8 +96,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
protected final Collection<PCollectionView<?>> sideInputs;
protected final Map<Integer, PCollectionView<?>> sideInputTagMapping;
- protected final boolean hasSideInputs;
-
protected final WindowingStrategy<?, ?> windowingStrategy;
protected final OutputManagerFactory<OutputT> outputManagerFactory;
@@ -136,8 +135,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
this.windowingStrategy = windowingStrategy;
this.outputManagerFactory = outputManagerFactory;
- this.hasSideInputs = !sideInputs.isEmpty();
-
this.pushedBackWatermarkDescriptor =
new ReducingStateDescriptor<>(
"pushed-back-elements-watermark-hold",
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ae4b6a3/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index c6dde51..01cfa5b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -201,8 +201,6 @@ public class WindowDoFnOperator<K, InputT, OutputT>
if (timer != null && timer.f1.getTimestamp().getMillis() < actualInputWatermark) {
fire = true;
- System.out.println("FIRING: " + timer);
-
watermarkTimersQueue.remove();
watermarkTimers.remove(timer);