You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/10 03:14:00 UTC
[1/5] beam git commit: Port DirectRunner TestStream override to
SDK-agnostic APIs
Repository: beam
Updated Branches:
refs/heads/master b4c77167f -> 1597f3ca6
Port DirectRunner TestStream override to SDK-agnostic APIs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eaaf45fa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eaaf45fa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eaaf45fa
Branch: refs/heads/master
Commit: eaaf45fa33d500a9f0fd0c2861aac4889ee5086c
Parents: ed6bd18
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 8 13:39:32 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 9 19:56:52 2017 -0700
----------------------------------------------------------------------
.../construction/TestStreamTranslation.java | 49 +++++++++++++++++++-
.../direct/TestStreamEvaluatorFactory.java | 20 ++++++--
.../org/apache/beam/sdk/testing/TestStream.java | 12 +++++
3 files changed, 75 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/eaaf45fa/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
index 90e6304..515de57 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
@@ -18,6 +18,9 @@
package org.apache.beam.runners.core.construction;
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
+
import com.google.auto.service.AutoService;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
@@ -33,6 +36,8 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -57,6 +62,48 @@ public class TestStreamTranslation {
return builder.build();
}
+ private static TestStream<?> fromProto(
+ RunnerApi.TestStreamPayload testStreamPayload, RunnerApi.Components components)
+ throws IOException {
+
+ Coder<Object> coder =
+ (Coder<Object>)
+ CoderTranslation.fromProto(
+ components.getCodersOrThrow(testStreamPayload.getCoderId()), components);
+
+ List<TestStream.Event<Object>> events = new ArrayList<>();
+
+ for (RunnerApi.TestStreamPayload.Event event : testStreamPayload.getEventsList()) {
+ events.add(fromProto(event, coder));
+ }
+ return TestStream.fromRawEvents(coder, events);
+ }
+
+ /**
+ * Converts an {@link AppliedPTransform}, which may be a rehydrated transform or an original
+ * {@link TestStream}, to a {@link TestStream}.
+ */
+ public static <T> TestStream<T> getTestStream(
+ AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> application)
+ throws IOException {
+ // For robustness, we don't take this shortcut:
+ // if (application.getTransform() instanceof TestStream) {
+ // return application.getTransform()
+ // }
+
+ SdkComponents sdkComponents = SdkComponents.create();
+ RunnerApi.PTransform transformProto = PTransformTranslation.toProto(application, sdkComponents);
+ checkArgument(
+ TEST_STREAM_TRANSFORM_URN.equals(transformProto.getSpec().getUrn()),
+ "Attempt to get %s from a transform with wrong URN %s",
+ TestStream.class.getSimpleName(),
+ transformProto.getSpec().getUrn());
+ RunnerApi.TestStreamPayload testStreamPayload =
+ transformProto.getSpec().getParameter().unpack(RunnerApi.TestStreamPayload.class);
+
+ return (TestStream<T>) fromProto(testStreamPayload, sdkComponents.toComponents());
+ }
+
static <T> RunnerApi.TestStreamPayload.Event toProto(TestStream.Event<T> event, Coder<T> coder)
throws IOException {
switch (event.getType()) {
@@ -130,7 +177,7 @@ public class TestStreamTranslation {
static class TestStreamTranslator implements TransformPayloadTranslator<TestStream<?>> {
@Override
public String getUrn(TestStream<?> transform) {
- return PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
+ return TEST_STREAM_TRANSFORM_URN;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/eaaf45fa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 2da7a71..16c8589 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -22,6 +22,7 @@ import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
+import org.apache.beam.runners.core.construction.TestStreamTranslation;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.testing.TestStream;
@@ -160,7 +162,8 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
}
static class DirectTestStreamFactory<T>
- implements PTransformOverrideFactory<PBegin, PCollection<T>, TestStream<T>> {
+ implements PTransformOverrideFactory<
+ PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> {
private final DirectRunner runner;
DirectTestStreamFactory(DirectRunner runner) {
@@ -169,10 +172,17 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
@Override
public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
- AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> transform) {
- return PTransformReplacement.of(
- transform.getPipeline().begin(),
- new DirectTestStream<T>(runner, transform.getTransform()));
+ AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform) {
+ try {
+ return PTransformReplacement.of(
+ transform.getPipeline().begin(),
+ new DirectTestStream<T>(runner, TestStreamTranslation.getTestStream(transform)));
+ } catch (IOException exc) {
+ throw new RuntimeException(
+ String.format(
+ "Transform could not be converted to %s", TestStream.class.getSimpleName()),
+ exc);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/eaaf45fa/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index 9ad8fd8..d13fcf1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -271,6 +271,18 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
return events;
}
+ /**
+ * <b>For internal use only. No backwards-compatibility guarantees.</b>
+ *
+ * <p>Builder a test stream directly from events. No validation is performed on
+ * watermark monotonicity, etc. This is assumed to be a previously-serialized
+ * {@link TestStream} transform that is correct by construction.
+ */
+ @Internal
+ public static <T> TestStream<T> fromRawEvents(Coder<T> coder, List<Event<T>> events) {
+ return new TestStream<>(coder, events);
+ }
+
@Override
public boolean equals(Object other) {
if (!(other instanceof TestStream)) {
[4/5] beam git commit: Port DirectGroupByKey to SDK-agnostic APIs
Posted by ke...@apache.org.
Port DirectGroupByKey to SDK-agnostic APIs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/02dbaefd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/02dbaefd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/02dbaefd
Branch: refs/heads/master
Commit: 02dbaefd2bbad0f0ff0b87469d184137b220fae7
Parents: 8c5b57e
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 14:27:23 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 9 19:56:52 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/direct/DirectGroupByKey.java | 13 +++++++------
.../direct/DirectGroupByKeyOverrideFactory.java | 14 +++++++++++---
2 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/02dbaefd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 2fc0dd4..06b8e29 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -36,13 +36,17 @@ import org.apache.beam.sdk.values.WindowingStrategy;
class DirectGroupByKey<K, V>
extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
- private final GroupByKey<K, V> original;
+ private final PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> original;
static final String DIRECT_GBKO_URN = "urn:beam:directrunner:transforms:gbko:v1";
static final String DIRECT_GABW_URN = "urn:beam:directrunner:transforms:gabw:v1";
+ private final WindowingStrategy<?, ?> outputWindowingStrategy;
- DirectGroupByKey(GroupByKey<K, V> from) {
- this.original = from;
+ DirectGroupByKey(
+ PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> original,
+ WindowingStrategy<?, ?> outputWindowingStrategy) {
+ this.original = original;
+ this.outputWindowingStrategy = outputWindowingStrategy;
}
@Override
@@ -57,9 +61,6 @@ class DirectGroupByKey<K, V>
// key/value input elements and the window merge operation of the
// window function associated with the input PCollection.
WindowingStrategy<?, ?> inputWindowingStrategy = input.getWindowingStrategy();
- // Update the windowing strategy as appropriate.
- WindowingStrategy<?, ?> outputWindowingStrategy =
- original.updateWindowingStrategy(inputWindowingStrategy);
// By default, implement GroupByKey via a series of lower-level operations.
return input
http://git-wip-us.apache.org/repos/asf/beam/blob/02dbaefd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
index c2eb5e7..9c2de3d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
@@ -17,26 +17,34 @@
*/
package org.apache.beam.runners.direct;
+import com.google.common.collect.Iterables;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
/** A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. */
final class DirectGroupByKeyOverrideFactory<K, V>
extends SingleInputOutputOverrideFactory<
- PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>> {
+ PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
+ PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {
@Override
public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
getReplacementTransform(
AppliedPTransform<
- PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>>
+ PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
+ PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>>
transform) {
+
+ PCollection<KV<K, Iterable<V>>> output =
+ (PCollection<KV<K, Iterable<V>>>) Iterables.getOnlyElement(transform.getOutputs().values());
+
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
- new DirectGroupByKey<>(transform.getTransform()));
+ new DirectGroupByKey<>(transform.getTransform(), output.getWindowingStrategy()));
}
}
[5/5] beam git commit: This closes #3338: [BEAM-2371] Port some
DirectRunner overrides to SDK-agnostic APIs
Posted by ke...@apache.org.
This closes #3338: [BEAM-2371] Port some DirectRunner overrides to SDK-agnostic APIs
Port DirectRunner TestStream override to SDK-agnostic APIs
Port DirectRunner WriteFiles override to SDK-agnostic APIs
Port DirectGroupByKey to SDK-agnostic APIs
Port ViewOverrideFactory to SDK-agnostic APIs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1597f3ca
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1597f3ca
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1597f3ca
Branch: refs/heads/master
Commit: 1597f3ca64558f0099237aeb618b144e132ddcc6
Parents: b4c7716 eaaf45f
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jun 9 19:57:17 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 9 19:57:17 2017 -0700
----------------------------------------------------------------------
.../CreatePCollectionViewTranslation.java | 4 +-
.../core/construction/PTransformMatchers.java | 17 +++++--
.../construction/TestStreamTranslation.java | 49 +++++++++++++++++++-
.../beam/runners/direct/DirectGroupByKey.java | 13 +++---
.../direct/DirectGroupByKeyOverrideFactory.java | 14 ++++--
.../direct/TestStreamEvaluatorFactory.java | 20 ++++++--
.../runners/direct/ViewOverrideFactory.java | 48 +++++++++++--------
.../direct/WriteWithShardingFactory.java | 30 ++++++++----
.../direct/ViewEvaluatorFactoryTest.java | 3 +-
.../runners/direct/ViewOverrideFactoryTest.java | 23 +++++++--
.../direct/WriteWithShardingFactoryTest.java | 26 +++++++----
.../org/apache/beam/sdk/testing/TestStream.java | 12 +++++
.../beam/sdk/values/PCollectionViews.java | 10 ++++
13 files changed, 207 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
[2/5] beam git commit: Port DirectRunner WriteFiles override to
SDK-agnostic APIs
Posted by ke...@apache.org.
Port DirectRunner WriteFiles override to SDK-agnostic APIs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ed6bd18b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ed6bd18b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ed6bd18b
Branch: refs/heads/master
Commit: ed6bd18bffe8a51d5fc2a59ff9aaa731b196d58a
Parents: 02dbaef
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 16:07:45 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 9 19:56:52 2017 -0700
----------------------------------------------------------------------
.../core/construction/PTransformMatchers.java | 17 ++++++++---
.../direct/WriteWithShardingFactory.java | 30 ++++++++++++++------
.../direct/WriteWithShardingFactoryTest.java | 26 +++++++++++------
3 files changed, 52 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ed6bd18b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index c339891..0d27241 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -17,13 +17,14 @@
*/
package org.apache.beam.runners.core.construction;
+import static org.apache.beam.runners.core.construction.PTransformTranslation.WRITE_FILES_TRANSFORM_URN;
+
import com.google.common.base.MoreObjects;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.transforms.DoFn;
@@ -359,10 +360,18 @@ public class PTransformMatchers {
return new PTransformMatcher() {
@Override
public boolean matches(AppliedPTransform<?, ?, ?> application) {
- if (PTransformTranslation.WRITE_FILES_TRANSFORM_URN.equals(
+ if (WRITE_FILES_TRANSFORM_URN.equals(
PTransformTranslation.urnForTransformOrNull(application.getTransform()))) {
- WriteFiles write = (WriteFiles) application.getTransform();
- return write.getSharding() == null && write.getNumShards() == null;
+ try {
+ return WriteFilesTranslation.isRunnerDeterminedSharding(
+ (AppliedPTransform) application);
+ } catch (IOException exc) {
+ throw new RuntimeException(
+ String.format(
+ "Transform with URN %s failed to parse: %s",
+ WRITE_FILES_TRANSFORM_URN, application.getTransform()),
+ exc);
+ }
}
return false;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ed6bd18b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index 65a5a19..d8734a1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -21,11 +21,13 @@ package org.apache.beam.runners.direct;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
+import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.runners.core.construction.PTransformReplacements;
+import org.apache.beam.runners.core.construction.WriteFilesTranslation;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -43,23 +45,33 @@ import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
/**
- * A {@link PTransformOverrideFactory} that overrides {@link WriteFiles}
- * {@link PTransform PTransforms} with an unspecified number of shards with a write with a
- * specified number of shards. The number of shards is the log base 10 of the number of input
- * records, with up to 2 additional shards.
+ * A {@link PTransformOverrideFactory} that overrides {@link WriteFiles} {@link PTransform
+ * PTransforms} with an unspecified number of shards with a write with a specified number of shards.
+ * The number of shards is the log base 10 of the number of input records, with up to 2 additional
+ * shards.
*/
class WriteWithShardingFactory<InputT>
- implements PTransformOverrideFactory<PCollection<InputT>, PDone, WriteFiles<InputT>> {
+ implements PTransformOverrideFactory<
+ PCollection<InputT>, PDone, PTransform<PCollection<InputT>, PDone>> {
static final int MAX_RANDOM_EXTRA_SHARDS = 3;
@VisibleForTesting static final int MIN_SHARDS_FOR_LOG = 3;
@Override
public PTransformReplacement<PCollection<InputT>, PDone> getReplacementTransform(
- AppliedPTransform<PCollection<InputT>, PDone, WriteFiles<InputT>> transform) {
+ AppliedPTransform<PCollection<InputT>, PDone, PTransform<PCollection<InputT>, PDone>>
+ transform) {
- return PTransformReplacement.of(
- PTransformReplacements.getSingletonMainInput(transform),
- transform.getTransform().withSharding(new LogElementShardsWithDrift<InputT>()));
+ try {
+ WriteFiles<InputT> replacement = WriteFiles.to(WriteFilesTranslation.getSink(transform));
+ if (WriteFilesTranslation.isWindowedWrites(transform)) {
+ replacement = replacement.withWindowedWrites();
+ }
+ return PTransformReplacement.of(
+ PTransformReplacements.getSingletonMainInput(transform),
+ replacement.withSharding(new LogElementShardsWithDrift<InputT>()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/ed6bd18b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index a88d95e..41d671f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -30,6 +30,7 @@ import static org.junit.Assert.assertThat;
import java.io.File;
import java.io.FileReader;
import java.io.Reader;
+import java.io.Serializable;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -53,6 +54,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -71,11 +73,17 @@ import org.junit.runners.JUnit4;
* Tests for {@link WriteWithShardingFactory}.
*/
@RunWith(JUnit4.class)
-public class WriteWithShardingFactoryTest {
+public class WriteWithShardingFactoryTest implements Serializable {
+
private static final int INPUT_SIZE = 10000;
- @Rule public TemporaryFolder tmp = new TemporaryFolder();
- private WriteWithShardingFactory<Object> factory = new WriteWithShardingFactory<>();
- @Rule public final TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+ @Rule public transient TemporaryFolder tmp = new TemporaryFolder();
+
+ private transient WriteWithShardingFactory<Object> factory = new WriteWithShardingFactory<>();
+
+ @Rule
+ public final transient TestPipeline p =
+ TestPipeline.create().enableAbandonedNodeEnforcement(false);
@Test
public void dynamicallyReshardedWrite() throws Exception {
@@ -135,7 +143,8 @@ public class WriteWithShardingFactoryTest {
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE,
"",
false);
- WriteFiles<Object> original =
+
+ PTransform<PCollection<Object>, PDone> original =
WriteFiles.to(
new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy) {
@Override
@@ -146,9 +155,10 @@ public class WriteWithShardingFactoryTest {
@SuppressWarnings("unchecked")
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
- AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication =
- AppliedPTransform.of(
- "write", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);
+ AppliedPTransform<PCollection<Object>, PDone, PTransform<PCollection<Object>, PDone>>
+ originalApplication =
+ AppliedPTransform.of(
+ "write", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);
assertThat(
factory.getReplacementTransform(originalApplication).getTransform(),
[3/5] beam git commit: Port ViewOverrideFactory to SDK-agnostic APIs
Posted by ke...@apache.org.
Port ViewOverrideFactory to SDK-agnostic APIs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8c5b57ea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8c5b57ea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8c5b57ea
Branch: refs/heads/master
Commit: 8c5b57ea8445cd50a35c6dffb460dcf0f426e700
Parents: b4c7716
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 14:26:55 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 9 19:56:52 2017 -0700
----------------------------------------------------------------------
.../CreatePCollectionViewTranslation.java | 4 +-
.../runners/direct/ViewOverrideFactory.java | 48 ++++++++++++--------
.../direct/ViewEvaluatorFactoryTest.java | 3 +-
.../runners/direct/ViewOverrideFactoryTest.java | 23 ++++++++--
.../beam/sdk/values/PCollectionViews.java | 10 ++++
5 files changed, 62 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
index aa24909..8fc99b9 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
@@ -56,8 +56,8 @@ public class CreatePCollectionViewTranslation {
@Deprecated
public static <ElemT, ViewT> PCollectionView<ViewT> getView(
AppliedPTransform<
- PCollection<ElemT>, PCollectionView<ViewT>,
- PTransform<PCollection<ElemT>, PCollectionView<ViewT>>>
+ PCollection<ElemT>, PCollection<ElemT>,
+ PTransform<PCollection<ElemT>, PCollection<ElemT>>>
application)
throws IOException {
http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index 06a7388..5dcf016 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -18,8 +18,9 @@
package org.apache.beam.runners.direct;
+import java.io.IOException;
import java.util.Map;
-import org.apache.beam.runners.core.construction.ForwardingPTransform;
+import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
@@ -43,16 +44,30 @@ import org.apache.beam.sdk.values.TupleTag;
*/
class ViewOverrideFactory<ElemT, ViewT>
implements PTransformOverrideFactory<
- PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> {
+ PCollection<ElemT>, PCollection<ElemT>,
+ PTransform<PCollection<ElemT>, PCollection<ElemT>>> {
@Override
public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform(
AppliedPTransform<
- PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>>
+ PCollection<ElemT>, PCollection<ElemT>,
+ PTransform<PCollection<ElemT>, PCollection<ElemT>>>
transform) {
- return PTransformReplacement.of(
+
+ PCollectionView<ViewT> view;
+ try {
+ view = CreatePCollectionViewTranslation.getView(transform);
+ } catch (IOException exc) {
+ throw new RuntimeException(
+ String.format(
+ "Could not extract %s from transform %s",
+ PCollectionView.class.getSimpleName(), transform),
+ exc);
+ }
+
+ return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
- new GroupAndWriteView<>(transform.getTransform()));
+ new GroupAndWriteView<ElemT, ViewT>(view));
}
@Override
@@ -63,11 +78,11 @@ class ViewOverrideFactory<ElemT, ViewT>
/** The {@link DirectRunner} composite override for {@link CreatePCollectionView}. */
static class GroupAndWriteView<ElemT, ViewT>
- extends ForwardingPTransform<PCollection<ElemT>, PCollection<ElemT>> {
- private final CreatePCollectionView<ElemT, ViewT> og;
+ extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
+ private final PCollectionView<ViewT> view;
- private GroupAndWriteView(CreatePCollectionView<ElemT, ViewT> og) {
- this.og = og;
+ private GroupAndWriteView(PCollectionView<ViewT> view) {
+ this.view = view;
}
@Override
@@ -77,14 +92,9 @@ class ViewOverrideFactory<ElemT, ViewT>
.setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
.apply(GroupByKey.<Void, ElemT>create())
.apply(Values.<Iterable<ElemT>>create())
- .apply(new WriteView<ElemT, ViewT>(og));
+ .apply(new WriteView<ElemT, ViewT>(view));
return input;
}
-
- @Override
- protected PTransform<PCollection<ElemT>, PCollection<ElemT>> delegate() {
- return og;
- }
}
/**
@@ -96,10 +106,10 @@ class ViewOverrideFactory<ElemT, ViewT>
*/
static final class WriteView<ElemT, ViewT>
extends RawPTransform<PCollection<Iterable<ElemT>>, PCollection<Iterable<ElemT>>> {
- private final CreatePCollectionView<ElemT, ViewT> og;
+ private final PCollectionView<ViewT> view;
- WriteView(CreatePCollectionView<ElemT, ViewT> og) {
- this.og = og;
+ WriteView(PCollectionView<ViewT> view) {
+ this.view = view;
}
@Override
@@ -112,7 +122,7 @@ class ViewOverrideFactory<ElemT, ViewT>
@SuppressWarnings("deprecation")
public PCollectionView<ViewT> getView() {
- return og.getView();
+ return view;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index ad1aecc..5bc48b7 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -66,7 +66,8 @@ public class ViewEvaluatorFactoryTest {
.apply(GroupByKey.<Void, String>create())
.apply(Values.<Iterable<String>>create());
PCollection<Iterable<String>> view =
- concat.apply(new ViewOverrideFactory.WriteView<>(createView));
+ concat.apply(
+ new ViewOverrideFactory.WriteView<String, Iterable<String>>(createView.getView()));
EvaluationContext context = mock(EvaluationContext.class);
TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
index 94728c7..6af9273 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.direct;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
@@ -36,8 +37,11 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
@@ -67,7 +71,7 @@ public class ViewOverrideFactoryTest implements Serializable {
factory.getReplacementTransform(
AppliedPTransform
.<PCollection<Integer>, PCollection<Integer>,
- CreatePCollectionView<Integer, List<Integer>>>
+ PTransform<PCollection<Integer>, PCollection<Integer>>>
of(
"foo",
ints.expand(),
@@ -102,7 +106,7 @@ public class ViewOverrideFactoryTest implements Serializable {
factory.getReplacementTransform(
AppliedPTransform
.<PCollection<Integer>, PCollection<Integer>,
- CreatePCollectionView<Integer, List<Integer>>>
+ PTransform<PCollection<Integer>, PCollection<Integer>>>
of(
"foo",
ints.expand(),
@@ -120,8 +124,19 @@ public class ViewOverrideFactoryTest implements Serializable {
"There should only be one WriteView primitive in the graph",
writeViewVisited.getAndSet(true),
is(false));
- PCollectionView replacementView = ((WriteView) node.getTransform()).getView();
- assertThat(replacementView, Matchers.<PCollectionView>theInstance(view));
+ PCollectionView<?> replacementView = ((WriteView) node.getTransform()).getView();
+
+ // replacementView.getPCollection() is null, but that is not a requirement
+ // so not asserted one way or the other
+ assertThat(
+ replacementView.getTagInternal(),
+ equalTo(view.getTagInternal()));
+ assertThat(
+ replacementView.getViewFn(),
+ Matchers.<ViewFn<?, ?>>equalTo(view.getViewFn()));
+ assertThat(
+ replacementView.getWindowMappingFn(),
+ Matchers.<WindowMappingFn<?>>equalTo(view.getWindowMappingFn()));
assertThat(node.getInputs().entrySet(), hasSize(1));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index 5e2e2c3..0c04370 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -282,6 +282,16 @@ public class PCollectionViews {
}
}));
}
+
+ @Override
+ public boolean equals(Object other) {
+ return other instanceof ListViewFn;
+ }
+
+ @Override
+ public int hashCode() {
+ return ListViewFn.class.hashCode();
+ }
}
/**