You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/17 21:08:14 UTC
[06/14] beam git commit: Add custom rehydration for TestStream
Add custom rehydration for TestStream
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6abf6f52
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6abf6f52
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6abf6f52
Branch: refs/heads/master
Commit: 6abf6f520df9efd5950063019bbc33ddc85a5c97
Parents: 10c63e1
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 2 20:20:53 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700
----------------------------------------------------------------------
.../construction/TestStreamTranslation.java | 171 +++++++++++++++----
.../construction/TestStreamTranslationTest.java | 4 +-
2 files changed, 142 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6abf6f52/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
index 8e4c1db..1b18844 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
@@ -22,12 +22,14 @@ import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
import com.google.auto.service.AutoService;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nonnull;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.coders.Coder;
@@ -47,21 +49,74 @@ import org.joda.time.Instant;
*/
public class TestStreamTranslation {
- static <T> RunnerApi.TestStreamPayload testStreamToPayload(
- TestStream<T> transform, SdkComponents components) throws IOException {
- String coderId = components.registerCoder(transform.getValueCoder());
+ private interface TestStreamLike {
+ Coder<?> getValueCoder();
- RunnerApi.TestStreamPayload.Builder builder =
- RunnerApi.TestStreamPayload.newBuilder().setCoderId(coderId);
+ List<RunnerApi.TestStreamPayload.Event> getEvents();
+ }
+
+ @VisibleForTesting
+ static class RawTestStream<T> extends PTransformTranslation.RawPTransform<PBegin, PCollection<T>>
+ implements TestStreamLike {
+
+ private final transient RehydratedComponents rehydratedComponents;
+ private final RunnerApi.TestStreamPayload payload;
+ private final Coder<T> valueCoder;
+ private final RunnerApi.FunctionSpec spec;
+
+ public RawTestStream(
+ RunnerApi.TestStreamPayload payload, RehydratedComponents rehydratedComponents) {
+ this.payload = payload;
+ this.spec =
+ RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(TEST_STREAM_TRANSFORM_URN)
+ .setPayload(payload.toByteString())
+ .build();
+ this.rehydratedComponents = rehydratedComponents;
+
+ // Eagerly extract the coder to throw a good exception here
+ try {
+ this.valueCoder = (Coder<T>) rehydratedComponents.getCoder(payload.getCoderId());
+ } catch (IOException exc) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Failure extracting coder with id '%s' for %s",
+ payload.getCoderId(), TestStream.class.getSimpleName()),
+ exc);
+ }
+ }
+
+ @Override
+ public String getUrn() {
+ return TEST_STREAM_TRANSFORM_URN;
+ }
+
+ @Nonnull
+ @Override
+ public RunnerApi.FunctionSpec getSpec() {
+ return spec;
+ }
- for (TestStream.Event<T> event : transform.getEvents()) {
- builder.addEvents(toProto(event, transform.getValueCoder()));
+ @Override
+ public RunnerApi.FunctionSpec migrate(SdkComponents components) throws IOException {
+ return RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(TEST_STREAM_TRANSFORM_URN)
+ .setPayload(payloadForTestStreamLike(this, components).toByteString())
+ .build();
}
- return builder.build();
+ @Override
+ public Coder<T> getValueCoder() {
+ return valueCoder;
+ }
+
+ @Override
+ public List<RunnerApi.TestStreamPayload.Event> getEvents() {
+ return payload.getEventsList();
+ }
}
- private static TestStream<?> fromProto(
+ private static TestStream<?> testStreamFromProtoPayload(
RunnerApi.TestStreamPayload testStreamPayload, RehydratedComponents components)
throws IOException {
@@ -70,7 +125,7 @@ public class TestStreamTranslation {
List<TestStream.Event<Object>> events = new ArrayList<>();
for (RunnerApi.TestStreamPayload.Event event : testStreamPayload.getEventsList()) {
- events.add(fromProto(event, coder));
+ events.add(eventFromProto(event, coder));
}
return TestStream.fromRawEvents(coder, events);
}
@@ -98,12 +153,12 @@ public class TestStreamTranslation {
RunnerApi.TestStreamPayload.parseFrom(transformProto.getSpec().getPayload());
return (TestStream<T>)
- fromProto(
+ testStreamFromProtoPayload(
testStreamPayload, RehydratedComponents.forComponents(sdkComponents.toComponents()));
}
- static <T> RunnerApi.TestStreamPayload.Event toProto(TestStream.Event<T> event, Coder<T> coder)
- throws IOException {
+ static <T> RunnerApi.TestStreamPayload.Event eventToProto(
+ TestStream.Event<T> event, Coder<T> coder) throws IOException {
switch (event.getType()) {
case WATERMARK:
return RunnerApi.TestStreamPayload.Event.newBuilder()
@@ -143,7 +198,7 @@ public class TestStreamTranslation {
}
}
- static <T> TestStream.Event<T> fromProto(
+ static <T> TestStream.Event<T> eventFromProto(
RunnerApi.TestStreamPayload.Event protoEvent, Coder<T> coder) throws IOException {
switch (protoEvent.getEventCase()) {
case WATERMARK_EVENT:
@@ -172,8 +227,8 @@ public class TestStreamTranslation {
}
}
- static class TestStreamTranslator
- extends TransformPayloadTranslator.WithDefaultRehydration<TestStream<?>> {
+ /** A translator registered to translate {@link TestStream} objects to protobuf representation. */
+ static class TestStreamTranslator implements TransformPayloadTranslator<TestStream<?>> {
@Override
public String getUrn(TestStream<?> transform) {
return TEST_STREAM_TRANSFORM_URN;
@@ -181,27 +236,81 @@ public class TestStreamTranslation {
@Override
public RunnerApi.FunctionSpec translate(
- AppliedPTransform<?, ?, TestStream<?>> transform, SdkComponents components)
+ final AppliedPTransform<?, ?, TestStream<?>> transform, SdkComponents components)
throws IOException {
- return RunnerApi.FunctionSpec.newBuilder()
- .setUrn(getUrn(transform.getTransform()))
- .setPayload(testStreamToPayload(transform.getTransform(), components).toByteString())
- .build();
+ return translateTyped(transform.getTransform(), components);
}
- }
- /** Registers {@link TestStreamTranslator}. */
- @AutoService(TransformPayloadTranslatorRegistrar.class)
- public static class Registrar implements TransformPayloadTranslatorRegistrar {
@Override
- public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
- getTransformPayloadTranslators() {
- return Collections.singletonMap(TestStream.class, new TestStreamTranslator());
+ public PTransformTranslation.RawPTransform<?, ?> rehydrate(
+ RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+ throws IOException {
+ checkArgument(
+ protoTransform.getSpec() != null,
+ "%s received transform with null spec",
+ getClass().getSimpleName());
+ checkArgument(protoTransform.getSpec().getUrn().equals(TEST_STREAM_TRANSFORM_URN));
+ return new RawTestStream<>(
+ RunnerApi.TestStreamPayload.parseFrom(protoTransform.getSpec().getPayload()),
+ rehydratedComponents);
}
- @Override
- public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
- return Collections.emptyMap();
+ private <T> RunnerApi.FunctionSpec translateTyped(
+ final TestStream<T> testStream, SdkComponents components) throws IOException {
+ return RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(TEST_STREAM_TRANSFORM_URN)
+ .setPayload(payloadForTestStream(testStream, components).toByteString())
+ .build();
}
+
+ /** Registers {@link TestStreamTranslator}. */
+ @AutoService(TransformPayloadTranslatorRegistrar.class)
+ public static class Registrar implements TransformPayloadTranslatorRegistrar {
+ @Override
+ public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+ getTransformPayloadTranslators() {
+ return Collections.singletonMap(TestStream.class, new TestStreamTranslator());
+ }
+
+ @Override
+ public Map<String, ? extends TransformPayloadTranslator> getTransformRehydrators() {
+ return Collections.singletonMap(TEST_STREAM_TRANSFORM_URN, new TestStreamTranslator());
+ }
+ }
+ }
+
+ /** Produces a {@link RunnerApi.TestStreamPayload} from a portable {@link RawTestStream}. */
+ static RunnerApi.TestStreamPayload payloadForTestStreamLike(
+ TestStreamLike transform, SdkComponents components) throws IOException {
+ return RunnerApi.TestStreamPayload.newBuilder()
+ .setCoderId(components.registerCoder(transform.getValueCoder()))
+ .addAllEvents(transform.getEvents())
+ .build();
+ }
+
+ @VisibleForTesting
+ static <T> RunnerApi.TestStreamPayload payloadForTestStream(
+ final TestStream<T> testStream, SdkComponents components) throws IOException {
+ return payloadForTestStreamLike(
+ new TestStreamLike() {
+ @Override
+ public Coder<T> getValueCoder() {
+ return testStream.getValueCoder();
+ }
+
+ @Override
+ public List<RunnerApi.TestStreamPayload.Event> getEvents() {
+ try {
+ List<RunnerApi.TestStreamPayload.Event> protoEvents = new ArrayList<>();
+ for (TestStream.Event<T> event : testStream.getEvents()) {
+ protoEvents.add(eventToProto(event, testStream.getValueCoder()));
+ }
+ return protoEvents;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ },
+ components);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6abf6f52/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
index 3678fc7..fc30552 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
@@ -81,7 +81,7 @@ public class TestStreamTranslationTest {
public void testEncodedProto() throws Exception {
SdkComponents components = SdkComponents.create();
RunnerApi.TestStreamPayload payload =
- TestStreamTranslation.testStreamToPayload(testStream, components);
+ TestStreamTranslation.payloadForTestStream(testStream, components);
verifyTestStreamEncoding(
testStream, payload, RehydratedComponents.forComponents(components.toComponents()));
@@ -122,7 +122,7 @@ public class TestStreamTranslationTest {
for (int i = 0; i < payload.getEventsList().size(); ++i) {
assertThat(
- TestStreamTranslation.fromProto(payload.getEvents(i), testStream.getValueCoder()),
+ TestStreamTranslation.eventFromProto(payload.getEvents(i), testStream.getValueCoder()),
equalTo(testStream.getEvents().get(i)));
}
}