You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/17 21:08:14 UTC

[06/14] beam git commit: Add custom rehydration for TestStream

Add custom rehydration for TestStream


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6abf6f52
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6abf6f52
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6abf6f52

Branch: refs/heads/master
Commit: 6abf6f520df9efd5950063019bbc33ddc85a5c97
Parents: 10c63e1
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 2 20:20:53 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700

----------------------------------------------------------------------
 .../construction/TestStreamTranslation.java     | 171 +++++++++++++++----
 .../construction/TestStreamTranslationTest.java |   4 +-
 2 files changed, 142 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6abf6f52/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
index 8e4c1db..1b18844 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
@@ -22,12 +22,14 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
 
 import com.google.auto.service.AutoService;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nonnull;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.coders.Coder;
@@ -47,21 +49,74 @@ import org.joda.time.Instant;
  */
 public class TestStreamTranslation {
 
-  static <T> RunnerApi.TestStreamPayload testStreamToPayload(
-      TestStream<T> transform, SdkComponents components) throws IOException {
-    String coderId = components.registerCoder(transform.getValueCoder());
+  private interface TestStreamLike {
+    Coder<?> getValueCoder();
 
-    RunnerApi.TestStreamPayload.Builder builder =
-        RunnerApi.TestStreamPayload.newBuilder().setCoderId(coderId);
+    List<RunnerApi.TestStreamPayload.Event> getEvents();
+  }
+
+  @VisibleForTesting
+  static class RawTestStream<T> extends PTransformTranslation.RawPTransform<PBegin, PCollection<T>>
+      implements TestStreamLike {
+
+    private final transient RehydratedComponents rehydratedComponents;
+    private final RunnerApi.TestStreamPayload payload;
+    private final Coder<T> valueCoder;
+    private final RunnerApi.FunctionSpec spec;
+
+    public RawTestStream(
+        RunnerApi.TestStreamPayload payload, RehydratedComponents rehydratedComponents) {
+      this.payload = payload;
+      this.spec =
+          RunnerApi.FunctionSpec.newBuilder()
+              .setUrn(TEST_STREAM_TRANSFORM_URN)
+              .setPayload(payload.toByteString())
+              .build();
+      this.rehydratedComponents = rehydratedComponents;
+
+      // Eagerly extract the coder to throw a good exception here
+      try {
+        this.valueCoder = (Coder<T>) rehydratedComponents.getCoder(payload.getCoderId());
+      } catch (IOException exc) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Failure extracting coder with id '%s' for %s",
+                payload.getCoderId(), TestStream.class.getSimpleName()),
+            exc);
+      }
+    }
+
+    @Override
+    public String getUrn() {
+      return TEST_STREAM_TRANSFORM_URN;
+    }
+
+    @Nonnull
+    @Override
+    public RunnerApi.FunctionSpec getSpec() {
+      return spec;
+    }
 
-    for (TestStream.Event<T> event : transform.getEvents()) {
-      builder.addEvents(toProto(event, transform.getValueCoder()));
+    @Override
+    public RunnerApi.FunctionSpec migrate(SdkComponents components) throws IOException {
+      return RunnerApi.FunctionSpec.newBuilder()
+          .setUrn(TEST_STREAM_TRANSFORM_URN)
+          .setPayload(payloadForTestStreamLike(this, components).toByteString())
+          .build();
     }
 
-    return builder.build();
+    @Override
+    public Coder<T> getValueCoder() {
+      return valueCoder;
+    }
+
+    @Override
+    public List<RunnerApi.TestStreamPayload.Event> getEvents() {
+      return payload.getEventsList();
+    }
   }
 
-  private static TestStream<?> fromProto(
+  private static TestStream<?> testStreamFromProtoPayload(
       RunnerApi.TestStreamPayload testStreamPayload, RehydratedComponents components)
       throws IOException {
 
@@ -70,7 +125,7 @@ public class TestStreamTranslation {
     List<TestStream.Event<Object>> events = new ArrayList<>();
 
     for (RunnerApi.TestStreamPayload.Event event : testStreamPayload.getEventsList()) {
-      events.add(fromProto(event, coder));
+      events.add(eventFromProto(event, coder));
     }
     return TestStream.fromRawEvents(coder, events);
   }
@@ -98,12 +153,12 @@ public class TestStreamTranslation {
         RunnerApi.TestStreamPayload.parseFrom(transformProto.getSpec().getPayload());
 
     return (TestStream<T>)
-        fromProto(
+        testStreamFromProtoPayload(
             testStreamPayload, RehydratedComponents.forComponents(sdkComponents.toComponents()));
   }
 
-  static <T> RunnerApi.TestStreamPayload.Event toProto(TestStream.Event<T> event, Coder<T> coder)
-      throws IOException {
+  static <T> RunnerApi.TestStreamPayload.Event eventToProto(
+      TestStream.Event<T> event, Coder<T> coder) throws IOException {
     switch (event.getType()) {
       case WATERMARK:
         return RunnerApi.TestStreamPayload.Event.newBuilder()
@@ -143,7 +198,7 @@ public class TestStreamTranslation {
     }
   }
 
-  static <T> TestStream.Event<T> fromProto(
+  static <T> TestStream.Event<T> eventFromProto(
       RunnerApi.TestStreamPayload.Event protoEvent, Coder<T> coder) throws IOException {
     switch (protoEvent.getEventCase()) {
       case WATERMARK_EVENT:
@@ -172,8 +227,8 @@ public class TestStreamTranslation {
     }
   }
 
-  static class TestStreamTranslator
-      extends TransformPayloadTranslator.WithDefaultRehydration<TestStream<?>> {
+  /** A translator registered to translate {@link TestStream} objects to protobuf representation. */
+  static class TestStreamTranslator implements TransformPayloadTranslator<TestStream<?>> {
     @Override
     public String getUrn(TestStream<?> transform) {
       return TEST_STREAM_TRANSFORM_URN;
@@ -181,27 +236,81 @@ public class TestStreamTranslation {
 
     @Override
     public RunnerApi.FunctionSpec translate(
-        AppliedPTransform<?, ?, TestStream<?>> transform, SdkComponents components)
+        final AppliedPTransform<?, ?, TestStream<?>> transform, SdkComponents components)
         throws IOException {
-      return RunnerApi.FunctionSpec.newBuilder()
-          .setUrn(getUrn(transform.getTransform()))
-          .setPayload(testStreamToPayload(transform.getTransform(), components).toByteString())
-          .build();
+      return translateTyped(transform.getTransform(), components);
     }
-  }
 
-  /** Registers {@link TestStreamTranslator}. */
-  @AutoService(TransformPayloadTranslatorRegistrar.class)
-  public static class Registrar implements TransformPayloadTranslatorRegistrar {
     @Override
-    public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
-        getTransformPayloadTranslators() {
-      return Collections.singletonMap(TestStream.class, new TestStreamTranslator());
+    public PTransformTranslation.RawPTransform<?, ?> rehydrate(
+        RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+        throws IOException {
+      checkArgument(
+          protoTransform.getSpec() != null,
+          "%s received transform with null spec",
+          getClass().getSimpleName());
+      checkArgument(protoTransform.getSpec().getUrn().equals(TEST_STREAM_TRANSFORM_URN));
+      return new RawTestStream<>(
+          RunnerApi.TestStreamPayload.parseFrom(protoTransform.getSpec().getPayload()),
+          rehydratedComponents);
     }
 
-    @Override
-    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
-      return Collections.emptyMap();
+    private <T> RunnerApi.FunctionSpec translateTyped(
+        final TestStream<T> testStream, SdkComponents components) throws IOException {
+      return RunnerApi.FunctionSpec.newBuilder()
+          .setUrn(TEST_STREAM_TRANSFORM_URN)
+          .setPayload(payloadForTestStream(testStream, components).toByteString())
+          .build();
     }
+
+    /** Registers {@link TestStreamTranslator}. */
+    @AutoService(TransformPayloadTranslatorRegistrar.class)
+    public static class Registrar implements TransformPayloadTranslatorRegistrar {
+      @Override
+      public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+          getTransformPayloadTranslators() {
+        return Collections.singletonMap(TestStream.class, new TestStreamTranslator());
+      }
+
+      @Override
+      public Map<String, ? extends TransformPayloadTranslator> getTransformRehydrators() {
+        return Collections.singletonMap(TEST_STREAM_TRANSFORM_URN, new TestStreamTranslator());
+      }
+    }
+  }
+
+  /** Produces a {@link RunnerApi.TestStreamPayload} from a portable {@link RawTestStream}. */
+  static RunnerApi.TestStreamPayload payloadForTestStreamLike(
+      TestStreamLike transform, SdkComponents components) throws IOException {
+    return RunnerApi.TestStreamPayload.newBuilder()
+        .setCoderId(components.registerCoder(transform.getValueCoder()))
+        .addAllEvents(transform.getEvents())
+        .build();
+  }
+
+  @VisibleForTesting
+  static <T> RunnerApi.TestStreamPayload payloadForTestStream(
+      final TestStream<T> testStream, SdkComponents components) throws IOException {
+    return payloadForTestStreamLike(
+        new TestStreamLike() {
+          @Override
+          public Coder<T> getValueCoder() {
+            return testStream.getValueCoder();
+          }
+
+          @Override
+          public List<RunnerApi.TestStreamPayload.Event> getEvents() {
+            try {
+              List<RunnerApi.TestStreamPayload.Event> protoEvents = new ArrayList<>();
+              for (TestStream.Event<T> event : testStream.getEvents()) {
+                protoEvents.add(eventToProto(event, testStream.getValueCoder()));
+              }
+              return protoEvents;
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        },
+        components);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6abf6f52/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
index 3678fc7..fc30552 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
@@ -81,7 +81,7 @@ public class TestStreamTranslationTest {
     public void testEncodedProto() throws Exception {
       SdkComponents components = SdkComponents.create();
       RunnerApi.TestStreamPayload payload =
-          TestStreamTranslation.testStreamToPayload(testStream, components);
+          TestStreamTranslation.payloadForTestStream(testStream, components);
 
       verifyTestStreamEncoding(
           testStream, payload, RehydratedComponents.forComponents(components.toComponents()));
@@ -122,7 +122,7 @@ public class TestStreamTranslationTest {
 
       for (int i = 0; i < payload.getEventsList().size(); ++i) {
         assertThat(
-            TestStreamTranslation.fromProto(payload.getEvents(i), testStream.getValueCoder()),
+            TestStreamTranslation.eventFromProto(payload.getEvents(i), testStream.getValueCoder()),
             equalTo(testStream.getEvents().get(i)));
       }
     }