You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/21 18:50:14 UTC

[1/2] beam git commit: Use dehydration-insensitive APIs in WindowEvaluatorFactory

Repository: beam
Updated Branches:
  refs/heads/master f870bf516 -> 8bd647596


Use dehydration-insensitive APIs in WindowEvaluatorFactory


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e38dc5fb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e38dc5fb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e38dc5fb

Branch: refs/heads/master
Commit: e38dc5fbe1f45fe282b4cd96b858631db73130e9
Parents: b6f126d
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jun 7 13:58:11 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jul 21 11:36:21 2017 -0700

----------------------------------------------------------------------
 .../construction/WindowIntoTranslation.java     | 46 +++++++++--
 .../WindowingStrategyTranslation.java           | 83 ++++++++++++--------
 .../construction/WindowIntoTranslationTest.java |  2 +-
 .../runners/direct/WindowEvaluatorFactory.java  |  5 +-
 4 files changed, 95 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e38dc5fb/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
index aa17bc9..6aec908 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
@@ -18,15 +18,17 @@
 
 package org.apache.beam.runners.core.construction;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.google.auto.service.AutoService;
 import com.google.protobuf.Any;
 import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.WindowIntoPayload;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -64,10 +66,44 @@ public class WindowIntoTranslation {
         .build();
   }
 
-  public static WindowFn<?, ?> getWindowFn(WindowIntoPayload payload)
-      throws InvalidProtocolBufferException {
-    SdkFunctionSpec spec = payload.getWindowFn();
-    return WindowingStrategyTranslation.windowFnFromProto(spec);
+  public static WindowIntoPayload getWindowIntoPayload(AppliedPTransform<?, ?, ?> application) {
+    RunnerApi.PTransform transformProto;
+    try {
+      transformProto =
+          PTransformTranslation.toProto(
+              application,
+              Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
+              SdkComponents.create());
+    } catch (IOException exc) {
+      throw new RuntimeException(exc);
+    }
+
+    checkArgument(
+        PTransformTranslation.WINDOW_TRANSFORM_URN.equals(transformProto.getSpec().getUrn()),
+        "Illegal attempt to extract %s from transform %s with name \"%s\" and URN \"%s\"",
+        Window.Assign.class.getSimpleName(),
+        application.getTransform(),
+        application.getFullName(),
+        transformProto.getSpec().getUrn());
+
+    WindowIntoPayload windowIntoPayload;
+    try {
+      return transformProto.getSpec().getParameter().unpack(WindowIntoPayload.class);
+    } catch (InvalidProtocolBufferException exc) {
+      throw new IllegalStateException(
+          String.format(
+              "%s translated %s with URN '%s' but payload was not a %s",
+              PTransformTranslation.class.getSimpleName(),
+              application,
+              PTransformTranslation.WINDOW_TRANSFORM_URN,
+              WindowIntoPayload.class.getSimpleName()),
+          exc);
+    }
+  }
+
+  public static WindowFn<?, ?> getWindowFn(AppliedPTransform<?, ?, ?> application) {
+    return WindowingStrategyTranslation.windowFnFromProto(
+        getWindowIntoPayload(application).getWindowFn());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/e38dc5fb/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index 1456a3f..046153d 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -357,40 +357,55 @@ public class WindowingStrategyTranslation implements Serializable {
         .withOnTimeBehavior(onTimeBehavior);
   }
 
-  public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec)
-      throws InvalidProtocolBufferException {
-    switch (windowFnSpec.getSpec().getUrn()) {
-      case GLOBAL_WINDOWS_FN:
-        return new GlobalWindows();
-      case FIXED_WINDOWS_FN:
-        StandardWindowFns.FixedWindowsPayload fixedParams =
-            windowFnSpec.getSpec().getParameter().unpack(
-                StandardWindowFns.FixedWindowsPayload.class);
-        return FixedWindows.of(
-            Duration.millis(Durations.toMillis(fixedParams.getSize())))
-            .withOffset(Duration.millis(Timestamps.toMillis(fixedParams.getOffset())));
-      case SLIDING_WINDOWS_FN:
-        StandardWindowFns.SlidingWindowsPayload slidingParams =
-            windowFnSpec.getSpec().getParameter().unpack(
-                StandardWindowFns.SlidingWindowsPayload.class);
-        return SlidingWindows.of(
-            Duration.millis(Durations.toMillis(slidingParams.getSize())))
-            .every(Duration.millis(Durations.toMillis(slidingParams.getPeriod())))
-            .withOffset(Duration.millis(Timestamps.toMillis(slidingParams.getOffset())));
-      case SESSION_WINDOWS_FN:
-        StandardWindowFns.SessionsPayload sessionParams =
-            windowFnSpec.getSpec().getParameter().unpack(
-                StandardWindowFns.SessionsPayload.class);
-        return Sessions.withGapDuration(
-            Duration.millis(Durations.toMillis(sessionParams.getGapSize())));
-      case SERIALIZED_JAVA_WINDOWFN_URN:
-      case OLD_SERIALIZED_JAVA_WINDOWFN_URN:
-        return (WindowFn<?, ?>) SerializableUtils.deserializeFromByteArray(
-            windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(),
-            "WindowFn");
-      default:
-        throw new IllegalArgumentException(
-            "Unknown or unsupported WindowFn: " + windowFnSpec.getSpec().getUrn());
+  public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec) {
+    try {
+      switch (windowFnSpec.getSpec().getUrn()) {
+        case GLOBAL_WINDOWS_FN:
+          return new GlobalWindows();
+        case FIXED_WINDOWS_FN:
+          StandardWindowFns.FixedWindowsPayload fixedParams =
+              windowFnSpec
+                  .getSpec()
+                  .getParameter()
+                  .unpack(StandardWindowFns.FixedWindowsPayload.class);
+          return FixedWindows.of(Duration.millis(Durations.toMillis(fixedParams.getSize())))
+              .withOffset(Duration.millis(Timestamps.toMillis(fixedParams.getOffset())));
+        case SLIDING_WINDOWS_FN:
+          StandardWindowFns.SlidingWindowsPayload slidingParams =
+              windowFnSpec
+                  .getSpec()
+                  .getParameter()
+                  .unpack(StandardWindowFns.SlidingWindowsPayload.class);
+          return SlidingWindows.of(Duration.millis(Durations.toMillis(slidingParams.getSize())))
+              .every(Duration.millis(Durations.toMillis(slidingParams.getPeriod())))
+              .withOffset(Duration.millis(Timestamps.toMillis(slidingParams.getOffset())));
+        case SESSION_WINDOWS_FN:
+          StandardWindowFns.SessionsPayload sessionParams =
+              windowFnSpec.getSpec().getParameter().unpack(StandardWindowFns.SessionsPayload.class);
+          return Sessions.withGapDuration(
+              Duration.millis(Durations.toMillis(sessionParams.getGapSize())));
+        case SERIALIZED_JAVA_WINDOWFN_URN:
+        case OLD_SERIALIZED_JAVA_WINDOWFN_URN:
+          return (WindowFn<?, ?>)
+              SerializableUtils.deserializeFromByteArray(
+                  windowFnSpec
+                      .getSpec()
+                      .getParameter()
+                      .unpack(BytesValue.class)
+                      .getValue()
+                      .toByteArray(),
+                  "WindowFn");
+        default:
+          throw new IllegalArgumentException(
+              "Unknown or unsupported WindowFn: " + windowFnSpec.getSpec().getUrn());
+      }
+    } catch (InvalidProtocolBufferException exc) {
+      throw new IllegalArgumentException(
+          String.format(
+              "%s for %s with URN %s did not contain expected proto message for payload",
+              FunctionSpec.class.getSimpleName(),
+              WindowFn.class.getSimpleName(),
+              windowFnSpec.getSpec().getUrn()));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e38dc5fb/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslationTest.java
index cb9617a..3ba2d6f 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslationTest.java
@@ -95,7 +95,7 @@ public class WindowIntoTranslationTest {
     WindowIntoPayload payload =
         WindowIntoTranslation.toProto(assign.get().getTransform(), components);
 
-    assertEquals(windowFn, WindowIntoTranslation.getWindowFn(payload));
+    assertEquals(windowFn, WindowingStrategyTranslation.windowFnFromProto(payload.getWindowFn()));
   }
 
   private static class CustomWindows extends PartitioningWindowFn<String, BoundedWindow> {

http://git-wip-us.apache.org/repos/asf/beam/blob/e38dc5fb/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index f4228d9..2d37b27 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.direct;
 import com.google.common.collect.Iterables;
 import java.util.Collection;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.WindowIntoTranslation;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -52,7 +53,9 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
   private <InputT> TransformEvaluator<InputT> createTransformEvaluator(
       AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Assign<InputT>>
           transform) {
-    WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn();
+
+    WindowFn<? super InputT, ?> fn = (WindowFn) WindowIntoTranslation.getWindowFn(transform);
+
     UncommittedBundle<InputT> outputBundle =
         evaluationContext.createBundle(
             (PCollection<InputT>) Iterables.getOnlyElement(transform.getOutputs().values()));


[2/2] beam git commit: This closes #3609: Use dehydration-insensitive APIs in WindowEvaluatorFactory

Posted by ke...@apache.org.
This closes #3609: Use dehydration-insensitive APIs in WindowEvaluatorFactory


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8bd64759
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8bd64759
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8bd64759

Branch: refs/heads/master
Commit: 8bd64759616a2f262f60ce2998b98f0dd1dd2000
Parents: f870bf5 e38dc5f
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jul 21 11:36:46 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jul 21 11:36:46 2017 -0700

----------------------------------------------------------------------
 .../construction/WindowIntoTranslation.java     | 46 +++++++++--
 .../WindowingStrategyTranslation.java           | 83 ++++++++++++--------
 .../construction/WindowIntoTranslationTest.java |  2 +-
 .../runners/direct/WindowEvaluatorFactory.java  |  5 +-
 4 files changed, 95 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8bd64759/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------