You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/21 18:50:14 UTC
[1/2] beam git commit: Use dehydration-insensitive APIs in
WindowEvaluatorFactory
Repository: beam
Updated Branches:
refs/heads/master f870bf516 -> 8bd647596
Use dehydration-insensitive APIs in WindowEvaluatorFactory
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e38dc5fb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e38dc5fb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e38dc5fb
Branch: refs/heads/master
Commit: e38dc5fbe1f45fe282b4cd96b858631db73130e9
Parents: b6f126d
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jun 7 13:58:11 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jul 21 11:36:21 2017 -0700
----------------------------------------------------------------------
.../construction/WindowIntoTranslation.java | 46 +++++++++--
.../WindowingStrategyTranslation.java | 83 ++++++++++++--------
.../construction/WindowIntoTranslationTest.java | 2 +-
.../runners/direct/WindowEvaluatorFactory.java | 5 +-
4 files changed, 95 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e38dc5fb/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
index aa17bc9..6aec908 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
@@ -18,15 +18,17 @@
package org.apache.beam.runners.core.construction;
+import static com.google.common.base.Preconditions.checkArgument;
+
import com.google.auto.service.AutoService;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
@@ -64,10 +66,44 @@ public class WindowIntoTranslation {
.build();
}
- public static WindowFn<?, ?> getWindowFn(WindowIntoPayload payload)
- throws InvalidProtocolBufferException {
- SdkFunctionSpec spec = payload.getWindowFn();
- return WindowingStrategyTranslation.windowFnFromProto(spec);
+ public static WindowIntoPayload getWindowIntoPayload(AppliedPTransform<?, ?, ?> application) {
+ RunnerApi.PTransform transformProto;
+ try {
+ transformProto =
+ PTransformTranslation.toProto(
+ application,
+ Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
+ SdkComponents.create());
+ } catch (IOException exc) {
+ throw new RuntimeException(exc);
+ }
+
+ checkArgument(
+ PTransformTranslation.WINDOW_TRANSFORM_URN.equals(transformProto.getSpec().getUrn()),
+ "Illegal attempt to extract %s from transform %s with name \"%s\" and URN \"%s\"",
+ Window.Assign.class.getSimpleName(),
+ application.getTransform(),
+ application.getFullName(),
+ transformProto.getSpec().getUrn());
+
+ WindowIntoPayload windowIntoPayload;
+ try {
+ return transformProto.getSpec().getParameter().unpack(WindowIntoPayload.class);
+ } catch (InvalidProtocolBufferException exc) {
+ throw new IllegalStateException(
+ String.format(
+ "%s translated %s with URN '%s' but payload was not a %s",
+ PTransformTranslation.class.getSimpleName(),
+ application,
+ PTransformTranslation.WINDOW_TRANSFORM_URN,
+ WindowIntoPayload.class.getSimpleName()),
+ exc);
+ }
+ }
+
+ public static WindowFn<?, ?> getWindowFn(AppliedPTransform<?, ?, ?> application) {
+ return WindowingStrategyTranslation.windowFnFromProto(
+ getWindowIntoPayload(application).getWindowFn());
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/e38dc5fb/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index 1456a3f..046153d 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -357,40 +357,55 @@ public class WindowingStrategyTranslation implements Serializable {
.withOnTimeBehavior(onTimeBehavior);
}
- public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec)
- throws InvalidProtocolBufferException {
- switch (windowFnSpec.getSpec().getUrn()) {
- case GLOBAL_WINDOWS_FN:
- return new GlobalWindows();
- case FIXED_WINDOWS_FN:
- StandardWindowFns.FixedWindowsPayload fixedParams =
- windowFnSpec.getSpec().getParameter().unpack(
- StandardWindowFns.FixedWindowsPayload.class);
- return FixedWindows.of(
- Duration.millis(Durations.toMillis(fixedParams.getSize())))
- .withOffset(Duration.millis(Timestamps.toMillis(fixedParams.getOffset())));
- case SLIDING_WINDOWS_FN:
- StandardWindowFns.SlidingWindowsPayload slidingParams =
- windowFnSpec.getSpec().getParameter().unpack(
- StandardWindowFns.SlidingWindowsPayload.class);
- return SlidingWindows.of(
- Duration.millis(Durations.toMillis(slidingParams.getSize())))
- .every(Duration.millis(Durations.toMillis(slidingParams.getPeriod())))
- .withOffset(Duration.millis(Timestamps.toMillis(slidingParams.getOffset())));
- case SESSION_WINDOWS_FN:
- StandardWindowFns.SessionsPayload sessionParams =
- windowFnSpec.getSpec().getParameter().unpack(
- StandardWindowFns.SessionsPayload.class);
- return Sessions.withGapDuration(
- Duration.millis(Durations.toMillis(sessionParams.getGapSize())));
- case SERIALIZED_JAVA_WINDOWFN_URN:
- case OLD_SERIALIZED_JAVA_WINDOWFN_URN:
- return (WindowFn<?, ?>) SerializableUtils.deserializeFromByteArray(
- windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(),
- "WindowFn");
- default:
- throw new IllegalArgumentException(
- "Unknown or unsupported WindowFn: " + windowFnSpec.getSpec().getUrn());
+ public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec) {
+ try {
+ switch (windowFnSpec.getSpec().getUrn()) {
+ case GLOBAL_WINDOWS_FN:
+ return new GlobalWindows();
+ case FIXED_WINDOWS_FN:
+ StandardWindowFns.FixedWindowsPayload fixedParams =
+ windowFnSpec
+ .getSpec()
+ .getParameter()
+ .unpack(StandardWindowFns.FixedWindowsPayload.class);
+ return FixedWindows.of(Duration.millis(Durations.toMillis(fixedParams.getSize())))
+ .withOffset(Duration.millis(Timestamps.toMillis(fixedParams.getOffset())));
+ case SLIDING_WINDOWS_FN:
+ StandardWindowFns.SlidingWindowsPayload slidingParams =
+ windowFnSpec
+ .getSpec()
+ .getParameter()
+ .unpack(StandardWindowFns.SlidingWindowsPayload.class);
+ return SlidingWindows.of(Duration.millis(Durations.toMillis(slidingParams.getSize())))
+ .every(Duration.millis(Durations.toMillis(slidingParams.getPeriod())))
+ .withOffset(Duration.millis(Timestamps.toMillis(slidingParams.getOffset())));
+ case SESSION_WINDOWS_FN:
+ StandardWindowFns.SessionsPayload sessionParams =
+ windowFnSpec.getSpec().getParameter().unpack(StandardWindowFns.SessionsPayload.class);
+ return Sessions.withGapDuration(
+ Duration.millis(Durations.toMillis(sessionParams.getGapSize())));
+ case SERIALIZED_JAVA_WINDOWFN_URN:
+ case OLD_SERIALIZED_JAVA_WINDOWFN_URN:
+ return (WindowFn<?, ?>)
+ SerializableUtils.deserializeFromByteArray(
+ windowFnSpec
+ .getSpec()
+ .getParameter()
+ .unpack(BytesValue.class)
+ .getValue()
+ .toByteArray(),
+ "WindowFn");
+ default:
+ throw new IllegalArgumentException(
+ "Unknown or unsupported WindowFn: " + windowFnSpec.getSpec().getUrn());
+ }
+ } catch (InvalidProtocolBufferException exc) {
+ throw new IllegalArgumentException(
+ String.format(
+ "%s for %s with URN %s did not contain expected proto message for payload",
+ FunctionSpec.class.getSimpleName(),
+ WindowFn.class.getSimpleName(),
+ windowFnSpec.getSpec().getUrn()));
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e38dc5fb/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslationTest.java
index cb9617a..3ba2d6f 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslationTest.java
@@ -95,7 +95,7 @@ public class WindowIntoTranslationTest {
WindowIntoPayload payload =
WindowIntoTranslation.toProto(assign.get().getTransform(), components);
- assertEquals(windowFn, WindowIntoTranslation.getWindowFn(payload));
+ assertEquals(windowFn, WindowingStrategyTranslation.windowFnFromProto(payload.getWindowFn()));
}
private static class CustomWindows extends PartitioningWindowFn<String, BoundedWindow> {
http://git-wip-us.apache.org/repos/asf/beam/blob/e38dc5fb/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index f4228d9..2d37b27 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.direct;
import com.google.common.collect.Iterables;
import java.util.Collection;
import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.WindowIntoTranslation;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -52,7 +53,9 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
private <InputT> TransformEvaluator<InputT> createTransformEvaluator(
AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Assign<InputT>>
transform) {
- WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn();
+
+ WindowFn<? super InputT, ?> fn = (WindowFn) WindowIntoTranslation.getWindowFn(transform);
+
UncommittedBundle<InputT> outputBundle =
evaluationContext.createBundle(
(PCollection<InputT>) Iterables.getOnlyElement(transform.getOutputs().values()));
[2/2] beam git commit: This closes #3609: Use dehydration-insensitive
APIs in WindowEvaluatorFactory
Posted by ke...@apache.org.
This closes #3609: Use dehydration-insensitive APIs in WindowEvaluatorFactory
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8bd64759
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8bd64759
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8bd64759
Branch: refs/heads/master
Commit: 8bd64759616a2f262f60ce2998b98f0dd1dd2000
Parents: f870bf5 e38dc5f
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jul 21 11:36:46 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jul 21 11:36:46 2017 -0700
----------------------------------------------------------------------
.../construction/WindowIntoTranslation.java | 46 +++++++++--
.../WindowingStrategyTranslation.java | 83 ++++++++++++--------
.../construction/WindowIntoTranslationTest.java | 2 +-
.../runners/direct/WindowEvaluatorFactory.java | 5 +-
4 files changed, 95 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8bd64759/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------