You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/12 16:56:02 UTC
[47/50] [abbrv] beam git commit: Port DirectRunner TestStream
override to SDK-agnostic APIs
Port DirectRunner TestStream override to SDK-agnostic APIs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eaaf45fa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eaaf45fa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eaaf45fa
Branch: refs/heads/gearpump-runner
Commit: eaaf45fa33d500a9f0fd0c2861aac4889ee5086c
Parents: ed6bd18
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 8 13:39:32 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 9 19:56:52 2017 -0700
----------------------------------------------------------------------
.../construction/TestStreamTranslation.java | 49 +++++++++++++++++++-
.../direct/TestStreamEvaluatorFactory.java | 20 ++++++--
.../org/apache/beam/sdk/testing/TestStream.java | 12 +++++
3 files changed, 75 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/eaaf45fa/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
index 90e6304..515de57 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
@@ -18,6 +18,9 @@
package org.apache.beam.runners.core.construction;
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
+
import com.google.auto.service.AutoService;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
@@ -33,6 +36,8 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -57,6 +62,48 @@ public class TestStreamTranslation {
return builder.build();
}
+ private static TestStream<?> fromProto(
+ RunnerApi.TestStreamPayload testStreamPayload, RunnerApi.Components components)
+ throws IOException {
+
+ Coder<Object> coder =
+ (Coder<Object>)
+ CoderTranslation.fromProto(
+ components.getCodersOrThrow(testStreamPayload.getCoderId()), components);
+
+ List<TestStream.Event<Object>> events = new ArrayList<>();
+
+ for (RunnerApi.TestStreamPayload.Event event : testStreamPayload.getEventsList()) {
+ events.add(fromProto(event, coder));
+ }
+ return TestStream.fromRawEvents(coder, events);
+ }
+
+ /**
+ * Converts an {@link AppliedPTransform}, which may be a rehydrated transform or an original
+ * {@link TestStream}, to a {@link TestStream}.
+ */
+ public static <T> TestStream<T> getTestStream(
+ AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> application)
+ throws IOException {
+ // For robustness, we don't take this shortcut:
+ // if (application.getTransform() instanceof TestStream) {
+ // return application.getTransform()
+ // }
+
+ SdkComponents sdkComponents = SdkComponents.create();
+ RunnerApi.PTransform transformProto = PTransformTranslation.toProto(application, sdkComponents);
+ checkArgument(
+ TEST_STREAM_TRANSFORM_URN.equals(transformProto.getSpec().getUrn()),
+ "Attempt to get %s from a transform with wrong URN %s",
+ TestStream.class.getSimpleName(),
+ transformProto.getSpec().getUrn());
+ RunnerApi.TestStreamPayload testStreamPayload =
+ transformProto.getSpec().getParameter().unpack(RunnerApi.TestStreamPayload.class);
+
+ return (TestStream<T>) fromProto(testStreamPayload, sdkComponents.toComponents());
+ }
+
static <T> RunnerApi.TestStreamPayload.Event toProto(TestStream.Event<T> event, Coder<T> coder)
throws IOException {
switch (event.getType()) {
@@ -130,7 +177,7 @@ public class TestStreamTranslation {
static class TestStreamTranslator implements TransformPayloadTranslator<TestStream<?>> {
@Override
public String getUrn(TestStream<?> transform) {
- return PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
+ return TEST_STREAM_TRANSFORM_URN;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/eaaf45fa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 2da7a71..16c8589 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -22,6 +22,7 @@ import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
+import org.apache.beam.runners.core.construction.TestStreamTranslation;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.testing.TestStream;
@@ -160,7 +162,8 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
}
static class DirectTestStreamFactory<T>
- implements PTransformOverrideFactory<PBegin, PCollection<T>, TestStream<T>> {
+ implements PTransformOverrideFactory<
+ PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> {
private final DirectRunner runner;
DirectTestStreamFactory(DirectRunner runner) {
@@ -169,10 +172,17 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
@Override
public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
- AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> transform) {
- return PTransformReplacement.of(
- transform.getPipeline().begin(),
- new DirectTestStream<T>(runner, transform.getTransform()));
+ AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform) {
+ try {
+ return PTransformReplacement.of(
+ transform.getPipeline().begin(),
+ new DirectTestStream<T>(runner, TestStreamTranslation.getTestStream(transform)));
+ } catch (IOException exc) {
+ throw new RuntimeException(
+ String.format(
+ "Transform could not be converted to %s", TestStream.class.getSimpleName()),
+ exc);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/eaaf45fa/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index 9ad8fd8..d13fcf1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -271,6 +271,18 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
return events;
}
+ /**
+ * <b>For internal use only. No backwards-compatibility guarantees.</b>
+ *
+ * <p>Builder a test stream directly from events. No validation is performed on
+ * watermark monotonicity, etc. This is assumed to be a previously-serialized
+ * {@link TestStream} transform that is correct by construction.
+ */
+ @Internal
+ public static <T> TestStream<T> fromRawEvents(Coder<T> coder, List<Event<T>> events) {
+ return new TestStream<>(coder, events);
+ }
+
@Override
public boolean equals(Object other) {
if (!(other instanceof TestStream)) {