You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/15 03:02:41 UTC

[1/2] incubator-beam git commit: Closes #181

Repository: incubator-beam
Updated Branches:
  refs/heads/master 5bdea1e2b -> 8dc9032ce


Closes #181


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8dc9032c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8dc9032c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8dc9032c

Branch: refs/heads/master
Commit: 8dc9032ce2cf870340dd922bc2643e48e84d1d5f
Parents: 5bdea1e b15424c
Author: Dan Halperin <dh...@google.com>
Authored: Thu Apr 14 18:02:07 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 14 18:02:07 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/io/Write.java |  9 +-
 .../java/org/apache/beam/sdk/io/WriteTest.java  | 95 ++++++++++++++++----
 2 files changed, 80 insertions(+), 24 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: [BEAM-188] Write: apply GlobalWindows first

Posted by dh...@apache.org.
[BEAM-188] Write: apply GlobalWindows first

And do not supply a timestamp when outputting.

Note that this is safe because the functions in the Writer cannot access the window
or timestamp. When we add per-Window or similar functions to the sinks, we will
likely do so at a higher level.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b15424ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b15424ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b15424ce

Branch: refs/heads/master
Commit: b15424cede856230d62ad102fa5a586d8f5e10ca
Parents: 5bdea1e
Author: Dan Halperin <dh...@google.com>
Authored: Thu Apr 14 17:23:42 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 14 18:02:07 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/io/Write.java |  9 +-
 .../java/org/apache/beam/sdk/io/WriteTest.java  | 95 ++++++++++++++++----
 2 files changed, 80 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15424ce/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index a8a2517..67761cd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -35,8 +35,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 
-import org.joda.time.Instant;
-
 import java.util.UUID;
 
 /**
@@ -154,6 +152,7 @@ public class Write {
       // There is a dependency between this ParDo and the first (the WriteOperation PCollection
       // as a side input), so this will happen after the initial ParDo.
       PCollection<WriteT> results = input
+          .apply(Window.<T>into(new GlobalWindows()))
           .apply("WriteBundles", ParDo.of(new DoFn<T, WriteT>() {
             // Writer that will write the records in this bundle. Lazily
             // initialized in processElement.
@@ -184,13 +183,11 @@ public class Write {
             public void finishBundle(Context c) throws Exception {
               if (writer != null) {
                 WriteT result = writer.close();
-                // Output the result of the write.
-                c.outputWithTimestamp(result, Instant.now());
+                c.output(result);
               }
             }
           }).withSideInputs(writeOperationView))
-          .setCoder(writeOperation.getWriterResultCoder())
-          .apply(Window.<WriteT>into(new GlobalWindows()));
+          .setCoder(writeOperation.getWriterResultCoder());
 
       final PCollectionView<Iterable<WriteT>> resultsView =
           results.apply(View.<WriteT>asIterable());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15424ce/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 865dbe1..af0676a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -35,8 +35,16 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
 import com.google.common.base.MoreObjects;
@@ -54,6 +62,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * Tests for the Write PTransform.
@@ -61,7 +70,47 @@ import java.util.UUID;
 @RunWith(JUnit4.class)
 public class WriteTest {
   // Static store that can be accessed within the writer
-  static List<String> sinkContents = new ArrayList<>();
+  private static List<String> sinkContents = new ArrayList<>();
+
+  private static final MapElements<String, String> IDENTITY_MAP =
+      MapElements.via(new SimpleFunction<String, String>() {
+        @Override
+        public String apply(String input) {
+          return input;
+        }
+      });
+
+  private static class WindowAndReshuffle<T> extends PTransform<PCollection<T>, PCollection<T>> {
+    private final Window.Bound<T> window;
+    public WindowAndReshuffle(Window.Bound<T> window) {
+      this.window = window;
+    }
+
+    private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+        c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
+      }
+    }
+
+    private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+        for (T s : c.element().getValue()) {
+          c.output(s);
+        }
+      }
+    }
+
+    @Override
+    public PCollection<T> apply(PCollection<T> input) {
+      return input
+          .apply(window)
+          .apply(ParDo.of(new AddArbitraryKey<T>()))
+          .apply(GroupByKey.<Integer, T>create())
+          .apply(ParDo.of(new RemoveArbitraryKey<T>()));
+    }
+  }
 
   /**
    * Test a Write transform with a PCollection of elements.
@@ -70,7 +119,7 @@ public class WriteTest {
   public void testWrite() {
     List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
         "Intimidating pigeon", "Pedantic gull", "Frisky finch");
-    runWrite(inputs, /* not windowed */ false);
+    runWrite(inputs, IDENTITY_MAP);
   }
 
   /**
@@ -79,7 +128,7 @@ public class WriteTest {
   @Test
   public void testWriteWithEmptyPCollection() {
     List<String> inputs = new ArrayList<>();
-    runWrite(inputs, /* not windowed */ false);
+    runWrite(inputs, IDENTITY_MAP);
   }
 
   /**
@@ -89,7 +138,21 @@ public class WriteTest {
   public void testWriteWindowed() {
     List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
         "Intimidating pigeon", "Pedantic gull", "Frisky finch");
-    runWrite(inputs, /* windowed */ true);
+    runWrite(
+        inputs, new WindowAndReshuffle(Window.<String>into(FixedWindows.of(Duration.millis(2)))));
+  }
+
+  /**
+   * Test a Write with sessions.
+   */
+  @Test
+  public void testWriteWithSessions() {
+    List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
+        "Intimidating pigeon", "Pedantic gull", "Frisky finch");
+
+    runWrite(
+        inputs,
+        new WindowAndReshuffle(Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))));
   }
 
   /**
@@ -97,7 +160,8 @@ public class WriteTest {
    * a test sink in the correct order, as well as verifies that the elements of a PCollection are
    * written to the sink.
    */
-  public void runWrite(List<String> inputs, boolean windowed) {
+  private static void runWrite(
+      List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform) {
     // Flag to validate that the pipeline options are passed to the Sink
     String[] args = {"--testFlag=test_value"};
     PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(WriteOptions.class);
@@ -106,21 +170,16 @@ public class WriteTest {
     // Clear the sink's contents.
     sinkContents.clear();
 
-    // Construct the input PCollection and test Sink.
-    PCollection<String> input;
-    if (windowed) {
-      List<Long> timestamps = new ArrayList<>();
-      for (long i = 0; i < inputs.size(); i++) {
-        timestamps.add(i + 1);
-      }
-      input = p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
-               .apply(Window.<String>into(FixedWindows.of(new Duration(2))));
-    } else {
-      input = p.apply(Create.of(inputs).withCoder(StringUtf8Coder.of()));
+    // Prepare timestamps for the elements.
+    List<Long> timestamps = new ArrayList<>();
+    for (long i = 0; i < inputs.size(); i++) {
+      timestamps.add(i + 1);
     }
-    TestSink sink = new TestSink();
 
-    input.apply(Write.to(sink));
+    TestSink sink = new TestSink();
+    p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
+     .apply(transform)
+     .apply(Write.to(sink));
 
     p.run();
     assertThat(sinkContents, containsInAnyOrder(inputs.toArray()));