You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/10 03:14:00 UTC

[1/5] beam git commit: Port DirectRunner TestStream override to SDK-agnostic APIs

Repository: beam
Updated Branches:
  refs/heads/master b4c77167f -> 1597f3ca6


Port DirectRunner TestStream override to SDK-agnostic APIs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eaaf45fa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eaaf45fa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eaaf45fa

Branch: refs/heads/master
Commit: eaaf45fa33d500a9f0fd0c2861aac4889ee5086c
Parents: ed6bd18
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 8 13:39:32 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 9 19:56:52 2017 -0700

----------------------------------------------------------------------
 .../construction/TestStreamTranslation.java     | 49 +++++++++++++++++++-
 .../direct/TestStreamEvaluatorFactory.java      | 20 ++++++--
 .../org/apache/beam/sdk/testing/TestStream.java | 12 +++++
 3 files changed, 75 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/eaaf45fa/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
index 90e6304..515de57 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
@@ -18,6 +18,9 @@
 
 package org.apache.beam.runners.core.construction;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
+
 import com.google.auto.service.AutoService;
 import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
@@ -33,6 +36,8 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -57,6 +62,48 @@ public class TestStreamTranslation {
     return builder.build();
   }
 
+  private static TestStream<?> fromProto(
+      RunnerApi.TestStreamPayload testStreamPayload, RunnerApi.Components components)
+      throws IOException {
+
+    Coder<Object> coder =
+        (Coder<Object>)
+            CoderTranslation.fromProto(
+                components.getCodersOrThrow(testStreamPayload.getCoderId()), components);
+
+    List<TestStream.Event<Object>> events = new ArrayList<>();
+
+    for (RunnerApi.TestStreamPayload.Event event : testStreamPayload.getEventsList()) {
+      events.add(fromProto(event, coder));
+    }
+    return TestStream.fromRawEvents(coder, events);
+  }
+
+  /**
+   * Converts an {@link AppliedPTransform}, which may be a rehydrated transform or an original
+   * {@link TestStream}, to a {@link TestStream}.
+   */
+  public static <T> TestStream<T> getTestStream(
+      AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> application)
+      throws IOException {
+    // For robustness, we don't take this shortcut:
+    // if (application.getTransform() instanceof TestStream) {
+    //   return application.getTransform()
+    // }
+
+    SdkComponents sdkComponents = SdkComponents.create();
+    RunnerApi.PTransform transformProto = PTransformTranslation.toProto(application, sdkComponents);
+    checkArgument(
+        TEST_STREAM_TRANSFORM_URN.equals(transformProto.getSpec().getUrn()),
+        "Attempt to get %s from a transform with wrong URN %s",
+        TestStream.class.getSimpleName(),
+        transformProto.getSpec().getUrn());
+    RunnerApi.TestStreamPayload testStreamPayload =
+        transformProto.getSpec().getParameter().unpack(RunnerApi.TestStreamPayload.class);
+
+    return (TestStream<T>) fromProto(testStreamPayload, sdkComponents.toComponents());
+  }
+
   static <T> RunnerApi.TestStreamPayload.Event toProto(TestStream.Event<T> event, Coder<T> coder)
       throws IOException {
     switch (event.getType()) {
@@ -130,7 +177,7 @@ public class TestStreamTranslation {
   static class TestStreamTranslator implements TransformPayloadTranslator<TestStream<?>> {
     @Override
     public String getUrn(TestStream<?> transform) {
-      return PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
+      return TEST_STREAM_TRANSFORM_URN;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/eaaf45fa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 2da7a71..16c8589 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -22,6 +22,7 @@ import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Iterables;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
+import org.apache.beam.runners.core.construction.TestStreamTranslation;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.testing.TestStream;
@@ -160,7 +162,8 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
   }
 
   static class DirectTestStreamFactory<T>
-      implements PTransformOverrideFactory<PBegin, PCollection<T>, TestStream<T>> {
+      implements PTransformOverrideFactory<
+          PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> {
     private final DirectRunner runner;
 
     DirectTestStreamFactory(DirectRunner runner) {
@@ -169,10 +172,17 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
 
     @Override
     public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
-        AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> transform) {
-      return PTransformReplacement.of(
-          transform.getPipeline().begin(),
-          new DirectTestStream<T>(runner, transform.getTransform()));
+        AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform) {
+      try {
+        return PTransformReplacement.of(
+            transform.getPipeline().begin(),
+            new DirectTestStream<T>(runner, TestStreamTranslation.getTestStream(transform)));
+      } catch (IOException exc) {
+        throw new RuntimeException(
+            String.format(
+                "Transform could not be converted to %s", TestStream.class.getSimpleName()),
+            exc);
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/eaaf45fa/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index 9ad8fd8..d13fcf1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -271,6 +271,18 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
     return events;
   }
 
+  /**
+   * <b>For internal use only. No backwards-compatibility guarantees.</b>
+   *
+   * <p>Builder a test stream directly from events. No validation is performed on
+   * watermark monotonicity, etc. This is assumed to be a previously-serialized
+   * {@link TestStream} transform that is correct by construction.
+   */
+  @Internal
+  public static <T> TestStream<T> fromRawEvents(Coder<T> coder, List<Event<T>> events) {
+    return new TestStream<>(coder, events);
+  }
+
   @Override
   public boolean equals(Object other) {
     if (!(other instanceof TestStream)) {


[4/5] beam git commit: Port DirectGroupByKey to SDK-agnostic APIs

Posted by ke...@apache.org.
Port DirectGroupByKey to SDK-agnostic APIs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/02dbaefd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/02dbaefd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/02dbaefd

Branch: refs/heads/master
Commit: 02dbaefd2bbad0f0ff0b87469d184137b220fae7
Parents: 8c5b57e
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 14:27:23 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 9 19:56:52 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/direct/DirectGroupByKey.java  | 13 +++++++------
 .../direct/DirectGroupByKeyOverrideFactory.java       | 14 +++++++++++---
 2 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/02dbaefd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 2fc0dd4..06b8e29 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -36,13 +36,17 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 
 class DirectGroupByKey<K, V>
     extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
-  private final GroupByKey<K, V> original;
+  private final PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> original;
 
   static final String DIRECT_GBKO_URN = "urn:beam:directrunner:transforms:gbko:v1";
   static final String DIRECT_GABW_URN = "urn:beam:directrunner:transforms:gabw:v1";
+  private final WindowingStrategy<?, ?> outputWindowingStrategy;
 
-  DirectGroupByKey(GroupByKey<K, V> from) {
-    this.original = from;
+  DirectGroupByKey(
+      PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> original,
+      WindowingStrategy<?, ?> outputWindowingStrategy) {
+    this.original = original;
+    this.outputWindowingStrategy = outputWindowingStrategy;
   }
 
   @Override
@@ -57,9 +61,6 @@ class DirectGroupByKey<K, V>
     // key/value input elements and the window merge operation of the
     // window function associated with the input PCollection.
     WindowingStrategy<?, ?> inputWindowingStrategy = input.getWindowingStrategy();
-    // Update the windowing strategy as appropriate.
-    WindowingStrategy<?, ?> outputWindowingStrategy =
-        original.updateWindowingStrategy(inputWindowingStrategy);
 
     // By default, implement GroupByKey via a series of lower-level operations.
     return input

http://git-wip-us.apache.org/repos/asf/beam/blob/02dbaefd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
index c2eb5e7..9c2de3d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
@@ -17,26 +17,34 @@
  */
 package org.apache.beam.runners.direct;
 
+import com.google.common.collect.Iterables;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
 /** A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. */
 final class DirectGroupByKeyOverrideFactory<K, V>
     extends SingleInputOutputOverrideFactory<
-        PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>> {
+        PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
+        PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {
   @Override
   public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
       getReplacementTransform(
           AppliedPTransform<
-                  PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>>
+                  PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
+                  PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>>
               transform) {
+
+    PCollection<KV<K, Iterable<V>>> output =
+        (PCollection<KV<K, Iterable<V>>>) Iterables.getOnlyElement(transform.getOutputs().values());
+
     return PTransformReplacement.of(
         PTransformReplacements.getSingletonMainInput(transform),
-        new DirectGroupByKey<>(transform.getTransform()));
+        new DirectGroupByKey<>(transform.getTransform(), output.getWindowingStrategy()));
   }
 }


[5/5] beam git commit: This closes #3338: [BEAM-2371] Port some DirectRunner overrides to SDK-agnostic APIs

Posted by ke...@apache.org.
This closes #3338: [BEAM-2371] Port some DirectRunner overrides to SDK-agnostic APIs

  Port DirectRunner TestStream override to SDK-agnostic APIs
  Port DirectRunner WriteFiles override to SDK-agnostic APIs
  Port DirectGroupByKey to SDK-agnostic APIs
  Port ViewOverrideFactory to SDK-agnostic APIs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1597f3ca
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1597f3ca
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1597f3ca

Branch: refs/heads/master
Commit: 1597f3ca64558f0099237aeb618b144e132ddcc6
Parents: b4c7716 eaaf45f
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jun 9 19:57:17 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 9 19:57:17 2017 -0700

----------------------------------------------------------------------
 .../CreatePCollectionViewTranslation.java       |  4 +-
 .../core/construction/PTransformMatchers.java   | 17 +++++--
 .../construction/TestStreamTranslation.java     | 49 +++++++++++++++++++-
 .../beam/runners/direct/DirectGroupByKey.java   | 13 +++---
 .../direct/DirectGroupByKeyOverrideFactory.java | 14 ++++--
 .../direct/TestStreamEvaluatorFactory.java      | 20 ++++++--
 .../runners/direct/ViewOverrideFactory.java     | 48 +++++++++++--------
 .../direct/WriteWithShardingFactory.java        | 30 ++++++++----
 .../direct/ViewEvaluatorFactoryTest.java        |  3 +-
 .../runners/direct/ViewOverrideFactoryTest.java | 23 +++++++--
 .../direct/WriteWithShardingFactoryTest.java    | 26 +++++++----
 .../org/apache/beam/sdk/testing/TestStream.java | 12 +++++
 .../beam/sdk/values/PCollectionViews.java       | 10 ++++
 13 files changed, 207 insertions(+), 62 deletions(-)
----------------------------------------------------------------------



[2/5] beam git commit: Port DirectRunner WriteFiles override to SDK-agnostic APIs

Posted by ke...@apache.org.
Port DirectRunner WriteFiles override to SDK-agnostic APIs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ed6bd18b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ed6bd18b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ed6bd18b

Branch: refs/heads/master
Commit: ed6bd18bffe8a51d5fc2a59ff9aaa731b196d58a
Parents: 02dbaef
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 16:07:45 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 9 19:56:52 2017 -0700

----------------------------------------------------------------------
 .../core/construction/PTransformMatchers.java   | 17 ++++++++---
 .../direct/WriteWithShardingFactory.java        | 30 ++++++++++++++------
 .../direct/WriteWithShardingFactoryTest.java    | 26 +++++++++++------
 3 files changed, 52 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ed6bd18b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index c339891..0d27241 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -17,13 +17,14 @@
  */
 package org.apache.beam.runners.core.construction;
 
+import static org.apache.beam.runners.core.construction.PTransformTranslation.WRITE_FILES_TRANSFORM_URN;
+
 import com.google.common.base.MoreObjects;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformMatcher;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -359,10 +360,18 @@ public class PTransformMatchers {
     return new PTransformMatcher() {
       @Override
       public boolean matches(AppliedPTransform<?, ?, ?> application) {
-        if (PTransformTranslation.WRITE_FILES_TRANSFORM_URN.equals(
+        if (WRITE_FILES_TRANSFORM_URN.equals(
             PTransformTranslation.urnForTransformOrNull(application.getTransform()))) {
-          WriteFiles write = (WriteFiles) application.getTransform();
-          return write.getSharding() == null && write.getNumShards() == null;
+          try {
+            return WriteFilesTranslation.isRunnerDeterminedSharding(
+                (AppliedPTransform) application);
+          } catch (IOException exc) {
+            throw new RuntimeException(
+                String.format(
+                    "Transform with URN %s failed to parse: %s",
+                    WRITE_FILES_TRANSFORM_URN, application.getTransform()),
+                exc);
+          }
         }
         return false;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/ed6bd18b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index 65a5a19..d8734a1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -21,11 +21,13 @@ package org.apache.beam.runners.direct;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
+import org.apache.beam.runners.core.construction.WriteFilesTranslation;
 import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -43,23 +45,33 @@ import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**
- * A {@link PTransformOverrideFactory} that overrides {@link WriteFiles}
- * {@link PTransform PTransforms} with an unspecified number of shards with a write with a
- * specified number of shards. The number of shards is the log base 10 of the number of input
- * records, with up to 2 additional shards.
+ * A {@link PTransformOverrideFactory} that overrides {@link WriteFiles} {@link PTransform
+ * PTransforms} with an unspecified number of shards with a write with a specified number of shards.
+ * The number of shards is the log base 10 of the number of input records, with up to 2 additional
+ * shards.
  */
 class WriteWithShardingFactory<InputT>
-    implements PTransformOverrideFactory<PCollection<InputT>, PDone, WriteFiles<InputT>> {
+    implements PTransformOverrideFactory<
+        PCollection<InputT>, PDone, PTransform<PCollection<InputT>, PDone>> {
   static final int MAX_RANDOM_EXTRA_SHARDS = 3;
   @VisibleForTesting static final int MIN_SHARDS_FOR_LOG = 3;
 
   @Override
   public PTransformReplacement<PCollection<InputT>, PDone> getReplacementTransform(
-      AppliedPTransform<PCollection<InputT>, PDone, WriteFiles<InputT>> transform) {
+      AppliedPTransform<PCollection<InputT>, PDone, PTransform<PCollection<InputT>, PDone>>
+          transform) {
 
-    return PTransformReplacement.of(
-        PTransformReplacements.getSingletonMainInput(transform),
-        transform.getTransform().withSharding(new LogElementShardsWithDrift<InputT>()));
+    try {
+      WriteFiles<InputT> replacement = WriteFiles.to(WriteFilesTranslation.getSink(transform));
+      if (WriteFilesTranslation.isWindowedWrites(transform)) {
+        replacement = replacement.withWindowedWrites();
+      }
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform),
+          replacement.withSharding(new LogElementShardsWithDrift<InputT>()));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/ed6bd18b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index a88d95e..41d671f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -30,6 +30,7 @@ import static org.junit.Assert.assertThat;
 import java.io.File;
 import java.io.FileReader;
 import java.io.Reader;
+import java.io.Serializable;
 import java.nio.CharBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -53,6 +54,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -71,11 +73,17 @@ import org.junit.runners.JUnit4;
  * Tests for {@link WriteWithShardingFactory}.
  */
 @RunWith(JUnit4.class)
-public class WriteWithShardingFactoryTest {
+public class WriteWithShardingFactoryTest implements Serializable {
+
   private static final int INPUT_SIZE = 10000;
-  @Rule public TemporaryFolder tmp = new TemporaryFolder();
-  private WriteWithShardingFactory<Object> factory = new WriteWithShardingFactory<>();
-  @Rule public final TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+  @Rule public transient TemporaryFolder tmp = new TemporaryFolder();
+
+  private transient WriteWithShardingFactory<Object> factory = new WriteWithShardingFactory<>();
+
+  @Rule
+  public final transient TestPipeline p =
+      TestPipeline.create().enableAbandonedNodeEnforcement(false);
 
   @Test
   public void dynamicallyReshardedWrite() throws Exception {
@@ -135,7 +143,8 @@ public class WriteWithShardingFactoryTest {
             DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE,
             "",
             false);
-    WriteFiles<Object> original =
+
+    PTransform<PCollection<Object>, PDone> original =
         WriteFiles.to(
             new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy) {
               @Override
@@ -146,9 +155,10 @@ public class WriteWithShardingFactoryTest {
     @SuppressWarnings("unchecked")
     PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
 
-    AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication =
-        AppliedPTransform.of(
-            "write", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);
+    AppliedPTransform<PCollection<Object>, PDone, PTransform<PCollection<Object>, PDone>>
+        originalApplication =
+            AppliedPTransform.of(
+                "write", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);
 
     assertThat(
         factory.getReplacementTransform(originalApplication).getTransform(),


[3/5] beam git commit: Port ViewOverrideFactory to SDK-agnostic APIs

Posted by ke...@apache.org.
Port ViewOverrideFactory to SDK-agnostic APIs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8c5b57ea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8c5b57ea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8c5b57ea

Branch: refs/heads/master
Commit: 8c5b57ea8445cd50a35c6dffb460dcf0f426e700
Parents: b4c7716
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 14:26:55 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 9 19:56:52 2017 -0700

----------------------------------------------------------------------
 .../CreatePCollectionViewTranslation.java       |  4 +-
 .../runners/direct/ViewOverrideFactory.java     | 48 ++++++++++++--------
 .../direct/ViewEvaluatorFactoryTest.java        |  3 +-
 .../runners/direct/ViewOverrideFactoryTest.java | 23 ++++++++--
 .../beam/sdk/values/PCollectionViews.java       | 10 ++++
 5 files changed, 62 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
index aa24909..8fc99b9 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
@@ -56,8 +56,8 @@ public class CreatePCollectionViewTranslation {
   @Deprecated
   public static <ElemT, ViewT> PCollectionView<ViewT> getView(
       AppliedPTransform<
-              PCollection<ElemT>, PCollectionView<ViewT>,
-              PTransform<PCollection<ElemT>, PCollectionView<ViewT>>>
+              PCollection<ElemT>, PCollection<ElemT>,
+              PTransform<PCollection<ElemT>, PCollection<ElemT>>>
           application)
       throws IOException {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index 06a7388..5dcf016 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -18,8 +18,9 @@
 
 package org.apache.beam.runners.direct;
 
+import java.io.IOException;
 import java.util.Map;
-import org.apache.beam.runners.core.construction.ForwardingPTransform;
+import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
@@ -43,16 +44,30 @@ import org.apache.beam.sdk.values.TupleTag;
  */
 class ViewOverrideFactory<ElemT, ViewT>
     implements PTransformOverrideFactory<
-        PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> {
+    PCollection<ElemT>, PCollection<ElemT>,
+        PTransform<PCollection<ElemT>, PCollection<ElemT>>> {
 
   @Override
   public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform(
       AppliedPTransform<
-              PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>>
+              PCollection<ElemT>, PCollection<ElemT>,
+              PTransform<PCollection<ElemT>, PCollection<ElemT>>>
           transform) {
-    return PTransformReplacement.of(
+
+    PCollectionView<ViewT> view;
+    try {
+      view = CreatePCollectionViewTranslation.getView(transform);
+    } catch (IOException exc) {
+      throw new RuntimeException(
+          String.format(
+              "Could not extract %s from transform %s",
+              PCollectionView.class.getSimpleName(), transform),
+          exc);
+    }
+
+      return PTransformReplacement.of(
         PTransformReplacements.getSingletonMainInput(transform),
-        new GroupAndWriteView<>(transform.getTransform()));
+        new GroupAndWriteView<ElemT, ViewT>(view));
   }
 
   @Override
@@ -63,11 +78,11 @@ class ViewOverrideFactory<ElemT, ViewT>
 
   /** The {@link DirectRunner} composite override for {@link CreatePCollectionView}. */
   static class GroupAndWriteView<ElemT, ViewT>
-      extends ForwardingPTransform<PCollection<ElemT>, PCollection<ElemT>> {
-    private final CreatePCollectionView<ElemT, ViewT> og;
+      extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
+    private final PCollectionView<ViewT> view;
 
-    private GroupAndWriteView(CreatePCollectionView<ElemT, ViewT> og) {
-      this.og = og;
+    private GroupAndWriteView(PCollectionView<ViewT> view) {
+      this.view = view;
     }
 
     @Override
@@ -77,14 +92,9 @@ class ViewOverrideFactory<ElemT, ViewT>
           .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
           .apply(GroupByKey.<Void, ElemT>create())
           .apply(Values.<Iterable<ElemT>>create())
-          .apply(new WriteView<ElemT, ViewT>(og));
+          .apply(new WriteView<ElemT, ViewT>(view));
       return input;
     }
-
-    @Override
-    protected PTransform<PCollection<ElemT>, PCollection<ElemT>> delegate() {
-      return og;
-    }
   }
 
   /**
@@ -96,10 +106,10 @@ class ViewOverrideFactory<ElemT, ViewT>
    */
   static final class WriteView<ElemT, ViewT>
       extends RawPTransform<PCollection<Iterable<ElemT>>, PCollection<Iterable<ElemT>>> {
-    private final CreatePCollectionView<ElemT, ViewT> og;
+    private final PCollectionView<ViewT> view;
 
-    WriteView(CreatePCollectionView<ElemT, ViewT> og) {
-      this.og = og;
+    WriteView(PCollectionView<ViewT> view) {
+      this.view = view;
     }
 
     @Override
@@ -112,7 +122,7 @@ class ViewOverrideFactory<ElemT, ViewT>
 
     @SuppressWarnings("deprecation")
     public PCollectionView<ViewT> getView() {
-      return og.getView();
+      return view;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index ad1aecc..5bc48b7 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -66,7 +66,8 @@ public class ViewEvaluatorFactoryTest {
             .apply(GroupByKey.<Void, String>create())
             .apply(Values.<Iterable<String>>create());
     PCollection<Iterable<String>> view =
-        concat.apply(new ViewOverrideFactory.WriteView<>(createView));
+        concat.apply(
+            new ViewOverrideFactory.WriteView<String, Iterable<String>>(createView.getView()));
 
     EvaluationContext context = mock(EvaluationContext.class);
     TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
index 94728c7..6af9273 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.direct;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
@@ -36,8 +37,11 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PCollectionViews;
@@ -67,7 +71,7 @@ public class ViewOverrideFactoryTest implements Serializable {
             factory.getReplacementTransform(
                 AppliedPTransform
                     .<PCollection<Integer>, PCollection<Integer>,
-                        CreatePCollectionView<Integer, List<Integer>>>
+                        PTransform<PCollection<Integer>, PCollection<Integer>>>
                         of(
                             "foo",
                             ints.expand(),
@@ -102,7 +106,7 @@ public class ViewOverrideFactoryTest implements Serializable {
         factory.getReplacementTransform(
             AppliedPTransform
                 .<PCollection<Integer>, PCollection<Integer>,
-                    CreatePCollectionView<Integer, List<Integer>>>
+                    PTransform<PCollection<Integer>, PCollection<Integer>>>
                     of(
                         "foo",
                         ints.expand(),
@@ -120,8 +124,19 @@ public class ViewOverrideFactoryTest implements Serializable {
                   "There should only be one WriteView primitive in the graph",
                   writeViewVisited.getAndSet(true),
                   is(false));
-              PCollectionView replacementView = ((WriteView) node.getTransform()).getView();
-              assertThat(replacementView, Matchers.<PCollectionView>theInstance(view));
+              PCollectionView<?> replacementView = ((WriteView) node.getTransform()).getView();
+
+              // replacementView.getPCollection() is null, but that is not a requirement
+              // so not asserted one way or the other
+              assertThat(
+                  replacementView.getTagInternal(),
+                  equalTo(view.getTagInternal()));
+              assertThat(
+                  replacementView.getViewFn(),
+                  Matchers.<ViewFn<?, ?>>equalTo(view.getViewFn()));
+              assertThat(
+                  replacementView.getWindowMappingFn(),
+                  Matchers.<WindowMappingFn<?>>equalTo(view.getWindowMappingFn()));
               assertThat(node.getInputs().entrySet(), hasSize(1));
             }
           }

http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index 5e2e2c3..0c04370 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -282,6 +282,16 @@ public class PCollectionViews {
             }
           }));
     }
+
+    @Override
+    public boolean equals(Object other) {
+      return other instanceof ListViewFn;
+    }
+
+    @Override
+    public int hashCode() {
+      return ListViewFn.class.hashCode();
+    }
   }
 
   /**