You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/09 02:04:13 UTC
[1/2] incubator-beam git commit: Move expansion of Window.Bound into
DirectPipelineRunner
Repository: incubator-beam
Updated Branches:
refs/heads/master a43f9b820 -> a32a26208
Move expansion of Window.Bound into DirectPipelineRunner
In the Beam model, windowing is a primitive concept. The expansion provided
by the SDK is not implementable except via access to privileged methods
not intended for Beam pipeline authors.
This change is a precursor to eliminating these privileged entirely.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/42969cb6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/42969cb6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/42969cb6
Branch: refs/heads/master
Commit: 42969cb62222744c41debe575857fb7d093ce527
Parents: 5f24cef
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Apr 7 17:34:19 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Apr 8 15:03:00 2016 -0700
----------------------------------------------------------------------
.../sdk/runners/DirectPipelineRunner.java | 47 ++++++++++++++++++++
.../inprocess/WindowEvaluatorFactory.java | 13 +++---
.../sdk/transforms/windowing/Window.java | 21 ++-------
.../cloud/dataflow/sdk/util/AssignWindows.java | 46 +++++++++++++++++++
4 files changed, 104 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42969cb6/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
index 35e392b..57e6116 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
@@ -47,7 +47,10 @@ import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.Partition;
import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
+import com.google.cloud.dataflow.sdk.util.AssignWindows;
import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly;
import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
@@ -57,6 +60,7 @@ import com.google.cloud.dataflow.sdk.util.PerKeyCombineFnRunners;
import com.google.cloud.dataflow.sdk.util.SerializableUtils;
import com.google.cloud.dataflow.sdk.util.TestCredential;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.util.common.Counter;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.values.KV;
@@ -255,6 +259,9 @@ public class DirectPipelineRunner
} else if (transform instanceof GroupByKey) {
return (OutputT)
((PCollection) input).apply(new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
+ } else if (transform instanceof Window.Bound) {
+ return (OutputT)
+ ((PCollection) input).apply(new AssignWindowsAndSetStrategy((Window.Bound) transform));
} else {
return super.apply(transform, input);
}
@@ -400,6 +407,46 @@ public class DirectPipelineRunner
}
}
+ private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow>
+ extends PTransform<PCollection<T>, PCollection<T>> {
+
+ private final Window.Bound<T> wrapped;
+
+ public AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public PCollection<T> apply(PCollection<T> input) {
+ WindowingStrategy<?, ?> outputStrategy =
+ wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
+
+ WindowFn<T, BoundedWindow> windowFn =
+ (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
+
+ // If the Window.Bound transform only changed parts other than the WindowFn, then
+ // we skip AssignWindows even though it should be harmless in a perfect world.
+ // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly
+ // crash if another GBK is performed without explicitly setting the WindowFn. So we skip
+ // AssignWindows in this case.
+ if (wrapped.getWindowFn() == null) {
+ return input.apply("Identity", ParDo.of(new IdentityFn<T>()))
+ .setWindowingStrategyInternal(outputStrategy);
+ } else {
+ return input
+ .apply("AssignWindows", new AssignWindows<T, BoundedWindow>(windowFn))
+ .setWindowingStrategyInternal(outputStrategy);
+ }
+ }
+ }
+
+ private static class IdentityFn<T> extends DoFn<T, T> {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element());
+ }
+ }
+
/**
* Apply the override for AvroIO.Write.Bound if the user requested sharding controls
* greater than one.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42969cb6/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java
index 0bdfac9..e553dbb 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java
@@ -61,23 +61,24 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
if (fn == null) {
return PassthroughTransformEvaluator.create(transform, outputBundle);
}
- return new WindowIntoEvaluator<>(fn, evaluationContext, outputBundle);
+ return new WindowIntoEvaluator<>(transform, fn, outputBundle);
}
private static class WindowIntoEvaluator<InputT> implements TransformEvaluator<InputT> {
+ private final AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>>
+ transform;
private final WindowFn<InputT, ?> windowFn;
- private final InProcessEvaluationContext context;
private final UncommittedBundle<InputT> outputBundle;
@SuppressWarnings("unchecked")
public WindowIntoEvaluator(
+ AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform,
WindowFn<? super InputT, ?> windowFn,
- InProcessEvaluationContext context,
UncommittedBundle<InputT> outputBundle) {
+ this.outputBundle = outputBundle;
+ this.transform = transform;
// Safe contravariant cast
this.windowFn = (WindowFn<InputT, ?>) windowFn;
- this.context = context;
- this.outputBundle = outputBundle;
}
@Override
@@ -98,7 +99,7 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
@Override
public InProcessTransformResult finishBundle() throws Exception {
- return StepTransformResult.withoutHold(null).addOutput(outputBundle).build();
+ return StepTransformResult.withoutHold(transform).addOutput(outputBundle).build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42969cb6/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java
index 1e7282d..20b3ed5 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java
@@ -25,7 +25,6 @@ import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode;
import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -360,6 +359,7 @@ public class Window {
*/
public static class Bound<T> extends PTransform<PCollection<T>, PCollection<T>> {
+
@Nullable private final WindowFn<? super T, ?> windowFn;
@Nullable private final Trigger<?> trigger;
@Nullable private final AccumulationMode mode;
@@ -587,21 +587,8 @@ public class Window {
public PCollection<T> apply(PCollection<T> input) {
WindowingStrategy<?, ?> outputStrategy =
getOutputStrategyInternal(input.getWindowingStrategy());
- PCollection<T> output;
- if (windowFn != null) {
- // If the windowFn changed, we create a primitive, and run the AssignWindows operation here.
- output = assignWindows(input, windowFn);
- } else {
- // If the windowFn didn't change, we just run a pass-through transform and then set the
- // new windowing strategy.
- output = input.apply(Window.<T>identity());
- }
- return output.setWindowingStrategyInternal(outputStrategy);
- }
-
- private <T, W extends BoundedWindow> PCollection<T> assignWindows(
- PCollection<T> input, WindowFn<? super T, W> windowFn) {
- return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<T, W>(windowFn)));
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), outputStrategy, input.isBounded());
}
@Override
@@ -639,7 +626,7 @@ public class Window {
* windows to be merged again as part of the next
* {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey}.
*/
- public static class Remerge<T> extends PTransform<PCollection<T>, PCollection<T>> {
+ private static class Remerge<T> extends PTransform<PCollection<T>, PCollection<T>> {
@Override
public PCollection<T> apply(PCollection<T> input) {
WindowingStrategy<?, ?> outputWindowingStrategy = getOutputWindowing(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42969cb6/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindows.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindows.java
new file mode 100644
index 0000000..57f489d
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindows.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.util;
+
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+/**
+ * {@link PTransform} that uses privileged (non-user-facing) APIs to assign elements of a
+ * {@link PCollection} to windows according to the provided {@link WindowFn}.
+ *
+ * @param <T> Type of elements being windowed
+ * @param <W> Window type
+ */
+public class AssignWindows<T, W extends BoundedWindow>
+ extends PTransform<PCollection<T>, PCollection<T>> {
+
+ private WindowFn<? super T, W> fn;
+
+ public AssignWindows(WindowFn<? super T, W> fn) {
+ this.fn = fn;
+ }
+
+ @Override
+ public PCollection<T> apply(PCollection<T> input) {
+ return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<>(fn)));
+ }
+}
[2/2] incubator-beam git commit: This closes #147
Posted by ke...@apache.org.
This closes #147
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a32a2620
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a32a2620
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a32a2620
Branch: refs/heads/master
Commit: a32a26208854a0875b10026d8f659ac275e37560
Parents: a43f9b8 42969cb
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Apr 8 17:04:03 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Apr 8 17:04:03 2016 -0700
----------------------------------------------------------------------
.../sdk/runners/DirectPipelineRunner.java | 47 ++++++++++++++++++++
.../inprocess/WindowEvaluatorFactory.java | 13 +++---
.../sdk/transforms/windowing/Window.java | 21 ++-------
.../cloud/dataflow/sdk/util/AssignWindows.java | 46 +++++++++++++++++++
4 files changed, 104 insertions(+), 23 deletions(-)
----------------------------------------------------------------------