You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/12 16:20:53 UTC

[1/2] beam git commit: Use SdkComponents in WindowingStrategy.toProto

Repository: beam
Updated Branches:
  refs/heads/master 7e603d5c7 -> 571631a5e


Use SdkComponents in WindowingStrategy.toProto


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e0fd0a22
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e0fd0a22
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e0fd0a22

Branch: refs/heads/master
Commit: e0fd0a222510b733b98d6c33c694431139bbc40d
Parents: 7e603d5
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 7 13:41:29 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 12 09:20:43 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/construction/Coders.java  |  6 +-
 .../core/construction/SdkComponents.java        |  5 +-
 .../core/construction/WindowingStrategies.java  | 94 ++++++++------------
 .../core/construction/SdkComponentsTest.java    |  6 +-
 .../construction/WindowingStrategiesTest.java   | 18 ++++
 5 files changed, 66 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e0fd0a22/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
index d890de7..7b96240 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
@@ -76,8 +76,10 @@ public class Coders {
   private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents components)
       throws IOException {
     List<String> componentIds = new ArrayList<>();
-    for (Coder<?> componentCoder : coder.getCoderArguments()) {
-      componentIds.add(components.registerCoder(componentCoder));
+    if (coder.getCoderArguments() != null) {
+      for (Coder<?> componentCoder : coder.getCoderArguments()) {
+        componentIds.add(components.registerCoder(componentCoder));
+      }
     }
     return RunnerApi.Coder.newBuilder()
         .addAllComponentCoderIds(componentIds)

http://git-wip-us.apache.org/repos/asf/beam/blob/e0fd0a22/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index 5cb0a00..03f3a03 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -96,7 +96,7 @@ class SdkComponents {
    * unique ID for the {@link WindowingStrategy}. Multiple registrations of the same {@link
    * WindowingStrategy} will return the same unique ID.
    */
-  String registerWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) {
+  String registerWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) throws IOException {
     String existing = windowingStrategyIds.get(windowingStrategy);
     if (existing != null) {
       return existing;
@@ -108,6 +108,9 @@ class SdkComponents {
             NameUtils.approximateSimpleName(windowingStrategy.getWindowFn()));
     String name = uniqify(baseName, windowingStrategyIds.values());
     windowingStrategyIds.put(windowingStrategy, name);
+    RunnerApi.WindowingStrategy windowingStrategyProto =
+        WindowingStrategies.toProto(windowingStrategy, this);
+    componentsBuilder.putWindowingStrategies(name, windowingStrategyProto);
     return name;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e0fd0a22/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
index 353be05..6d721b0 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
@@ -19,15 +19,12 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.UUID;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
@@ -127,62 +124,30 @@ public class WindowingStrategies implements Serializable {
     }
   }
 
-  // This URN says that the coder is just a UDF blob the indicated SDK understands
-  // TODO: standardize such things
-  public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1";
-
   // This URN says that the WindowFn is just a UDF blob the indicated SDK understands
   // TODO: standardize such things
   public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";
 
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
   /**
-   * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where
-   * {@link RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link FunctionSpec}
-   * for the input {@link WindowFn}.
+   * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link
+   * RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link RunnerApi.FunctionSpec} for the
+   * input {@link WindowFn}.
    */
-  public static RunnerApi.MessageWithComponents toProto(WindowFn<?, ?> windowFn)
+  public static SdkFunctionSpec toProto(
+      WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components)
       throws IOException {
-    Coder<?> windowCoder = windowFn.windowCoder();
-
-    // TODO: re-use components
-    String windowCoderId = UUID.randomUUID().toString();
-
-    SdkFunctionSpec windowFnSpec =
-        SdkFunctionSpec.newBuilder()
-            .setSpec(
-                FunctionSpec.newBuilder()
-                    .setUrn(CUSTOM_WINDOWFN_URN)
-                    .setParameter(
-                        Any.pack(
-                            BytesValue.newBuilder()
-                                .setValue(
-                                    ByteString.copyFrom(
-                                        SerializableUtils.serializeToByteArray(windowFn)))
-                                .build())))
-            .build();
-
-    RunnerApi.Coder windowCoderProto =
-        RunnerApi.Coder.newBuilder()
-            .setSpec(
-                SdkFunctionSpec.newBuilder()
-                    .setSpec(
-                        FunctionSpec.newBuilder()
-                            .setUrn(CUSTOM_CODER_URN)
-                            .setParameter(
-                                Any.pack(
-                                    BytesValue.newBuilder()
-                                        .setValue(
-                                            ByteString.copyFrom(
-                                                OBJECT_MAPPER.writeValueAsBytes(
-                                                    windowCoder.asCloudObject())))
-                                        .build()))))
-            .build();
-
-    return RunnerApi.MessageWithComponents.newBuilder()
-        .setSdkFunctionSpec(windowFnSpec)
-        .setComponents(Components.newBuilder().putCoders(windowCoderId, windowCoderProto))
+    return SdkFunctionSpec.newBuilder()
+        // TODO: Set environment ID
+        .setSpec(
+            FunctionSpec.newBuilder()
+                .setUrn(CUSTOM_WINDOWFN_URN)
+                .setParameter(
+                    Any.pack(
+                        BytesValue.newBuilder()
+                            .setValue(
+                                ByteString.copyFrom(
+                                    SerializableUtils.serializeToByteArray(windowFn)))
+                            .build())))
         .build();
   }
 
@@ -194,9 +159,22 @@ public class WindowingStrategies implements Serializable {
    */
   public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy)
       throws IOException {
+    SdkComponents components = SdkComponents.create();
+    RunnerApi.WindowingStrategy windowingStrategyProto = toProto(windowingStrategy, components);
 
-    RunnerApi.MessageWithComponents windowFnWithComponents =
-        toProto(windowingStrategy.getWindowFn());
+    return RunnerApi.MessageWithComponents.newBuilder()
+        .setWindowingStrategy(windowingStrategyProto)
+        .setComponents(components.toComponents())
+        .build();
+  }
+
+  /**
+   * Converts a {@link WindowingStrategy} into a {@link RunnerApi.WindowingStrategy}, registering
+   * any components in the provided {@link SdkComponents}.
+   */
+  public static RunnerApi.WindowingStrategy toProto(
+      WindowingStrategy<?, ?> windowingStrategy, SdkComponents components) throws IOException {
+    SdkFunctionSpec windowFnSpec = toProto(windowingStrategy.getWindowFn(), components);
 
     RunnerApi.WindowingStrategy.Builder windowingStrategyProto =
         RunnerApi.WindowingStrategy.newBuilder()
@@ -205,11 +183,11 @@ public class WindowingStrategies implements Serializable {
             .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
             .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
             .setTrigger(Triggers.toProto(windowingStrategy.getTrigger()))
-            .setWindowFn(windowFnWithComponents.getSdkFunctionSpec());
+            .setWindowFn(windowFnSpec)
+            .setWindowCoderId(
+                components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
 
-    return RunnerApi.MessageWithComponents.newBuilder()
-        .setWindowingStrategy(windowingStrategyProto)
-        .setComponents(windowFnWithComponents.getComponents()).build();
+    return windowingStrategyProto.build();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/e0fd0a22/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
index 28b4911..ef4b16b 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
@@ -132,15 +132,17 @@ public class SdkComponentsTest {
   }
 
   @Test
-  public void registerWindowingStrategy() {
+  public void registerWindowingStrategy() throws IOException {
     WindowingStrategy<?, ?> strategy =
         WindowingStrategy.globalDefault().withMode(AccumulationMode.ACCUMULATING_FIRED_PANES);
     String name = components.registerWindowingStrategy(strategy);
     assertThat(name, not(isEmptyOrNullString()));
+
+    components.toComponents().getWindowingStrategiesOrThrow(name);
   }
 
   @Test
-  public void registerWindowingStrategyIdEqualStrategies() {
+  public void registerWindowingStrategyIdEqualStrategies() throws IOException {
     WindowingStrategy<?, ?> strategy =
         WindowingStrategy.globalDefault().withMode(AccumulationMode.ACCUMULATING_FIRED_PANES);
     String name = components.registerWindowingStrategy(strategy);

http://git-wip-us.apache.org/repos/asf/beam/blob/e0fd0a22/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
index b603d65..62bba8e 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertThat;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
@@ -30,6 +31,7 @@ import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -89,4 +91,20 @@ public class WindowingStrategiesTest {
         toProtoAndBackWindowingStrategy,
         equalTo((WindowingStrategy) windowingStrategy.fixDefaults()));
   }
+
+  @Test
+  public void testToProtoAndBackWithComponents() throws Exception {
+    WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy();
+    SdkComponents components = SdkComponents.create();
+    RunnerApi.WindowingStrategy proto =
+        WindowingStrategies.toProto(windowingStrategy, components);
+    RunnerApi.Components protoComponents = components.toComponents();
+
+    assertThat(
+        WindowingStrategies.fromProto(proto, protoComponents).fixDefaults(),
+        Matchers.<WindowingStrategy<?, ?>>equalTo(windowingStrategy.fixDefaults()));
+
+    protoComponents.getCodersOrThrow(
+        components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
+  }
 }


[2/2] beam git commit: This closes #2490

Posted by dh...@apache.org.
This closes #2490


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/571631a5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/571631a5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/571631a5

Branch: refs/heads/master
Commit: 571631a5ef83ef31ea9f652c5698cea3cc95ad43
Parents: 7e603d5 e0fd0a2
Author: Dan Halperin <dh...@google.com>
Authored: Wed Apr 12 09:20:46 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 12 09:20:46 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/construction/Coders.java  |  6 +-
 .../core/construction/SdkComponents.java        |  5 +-
 .../core/construction/WindowingStrategies.java  | 94 ++++++++------------
 .../core/construction/SdkComponentsTest.java    |  6 +-
 .../construction/WindowingStrategiesTest.java   | 18 ++++
 5 files changed, 66 insertions(+), 63 deletions(-)
----------------------------------------------------------------------