You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/02 22:45:50 UTC

[1/2] beam git commit: Allow the Distinct transform to deduplicate elements across panes

Repository: beam
Updated Branches:
  refs/heads/master 53c9bf4cd -> 9cdae6caf


Allow the Distinct transform to deduplicate elements across panes


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1bc84fb5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1bc84fb5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1bc84fb5

Branch: refs/heads/master
Commit: 1bc84fb5ff4ca087c97da45247f1e445eadc48de
Parents: 53c9bf4
Author: Reuven Lax <re...@google.com>
Authored: Tue May 16 12:12:01 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 2 15:42:53 2017 -0700

----------------------------------------------------------------------
 .../runners/spark/SparkRunnerDebuggerTest.java  |   2 +-
 .../apache/beam/sdk/transforms/Distinct.java    |  80 +++++++++---
 .../beam/sdk/transforms/DistinctTest.java       | 130 ++++++++++++++++++-
 3 files changed, 188 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1bc84fb5/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index 9009751..64ff98c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -142,7 +142,7 @@ public class SparkRunnerDebuggerTest {
         + "_.mapPartitions(new org.apache.beam.sdk.transforms.Distinct$2())\n"
         + "_.groupByKey()\n"
         + "_.map(new org.apache.beam.sdk.transforms.Combine$IterableCombineFn())\n"
-        + "_.mapPartitions(new org.apache.beam.sdk.transforms.Keys$1())\n"
+        + "_.mapPartitions(new org.apache.beam.sdk.transforms.Distinct$3())\n"
         + "_.mapPartitions(new org.apache.beam.sdk.transforms.WithKeys$2())\n"
         + "_.<org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Write>";
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc84fb5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
index 2d08cee..d751dbe 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
@@ -17,9 +17,15 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * {@code Distinct<T>} takes a {@code PCollection<T>} and
@@ -59,6 +65,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  */
 public class Distinct<T> extends PTransform<PCollection<T>,
                                                     PCollection<T>> {
+  private static final Logger LOG = LoggerFactory.getLogger(Distinct.class);
+
   /**
    * Returns a {@code Distinct<T>} {@code PTransform}.
    *
@@ -66,7 +74,7 @@ public class Distinct<T> extends PTransform<PCollection<T>,
    * {@code PCollection}s
    */
   public static <T> Distinct<T> create() {
-    return new Distinct<T>();
+    return new Distinct<>();
   }
 
   /**
@@ -78,26 +86,48 @@ public class Distinct<T> extends PTransform<PCollection<T>,
    */
   public static <T, IdT> WithRepresentativeValues<T, IdT> withRepresentativeValueFn(
       SerializableFunction<T, IdT> fn) {
-    return new WithRepresentativeValues<T, IdT>(fn, null);
+    return new WithRepresentativeValues<>(fn, null);
+  }
+
+  private static <T, W extends BoundedWindow> void validateWindowStrategy(
+      WindowingStrategy<T, W> strategy) {
+    if (!strategy.getWindowFn().isNonMerging()
+        && (!strategy.getTrigger().getClass().equals(DefaultTrigger.class)
+        || strategy.getAllowedLateness().isLongerThan(Duration.ZERO))) {
+        throw new UnsupportedOperationException(String.format(
+            "%s does not support non-merging windowing strategies, except when using the default "
+                + "trigger and zero allowed lateness.", Distinct.class.getSimpleName()));
+    }
   }
 
   @Override
   public PCollection<T> expand(PCollection<T> in) {
-    return in
-        .apply("CreateIndex", MapElements.via(new SimpleFunction<T, KV<T, Void>>() {
-          @Override
-          public KV<T, Void> apply(T element) {
-            return KV.of(element, (Void) null);
-          }
-        }))
-        .apply(Combine.<T, Void>perKey(
-            new SerializableFunction<Iterable<Void>, Void>() {
+    validateWindowStrategy(in.getWindowingStrategy());
+    PCollection<KV<T, Void>> combined =
+        in.apply("KeyByElement", MapElements.via(
+            new SimpleFunction<T, KV<T, Void>>() {
               @Override
-              public Void apply(Iterable<Void> iter) {
-                return null; // ignore input
-                }
+              public KV<T, Void> apply(T element) {
+                return KV.of(element, (Void) null);
+              }
             }))
-        .apply(Keys.<T>create());
+            .apply("DropValues",
+                Combine.<T, Void>perKey(
+                    new SerializableFunction<Iterable<Void>, Void>() {
+                      @Override
+                      public Void apply(Iterable<Void> iter) {
+                        return null; // ignore input
+                      }
+                    }));
+    return combined.apply("ExtractFirstKey", ParDo.of(new DoFn<KV<T, Void>, T>() {
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        if (c.pane().isFirst()) {
+          // Only output the key if it's the first time it's been seen.
+          c.output(c.element().getKey());
+        }
+      }
+    }));
   }
 
   /**
@@ -120,22 +150,32 @@ public class Distinct<T> extends PTransform<PCollection<T>,
       this.representativeType = representativeType;
     }
 
+
     @Override
     public PCollection<T> expand(PCollection<T> in) {
+      validateWindowStrategy(in.getWindowingStrategy());
       WithKeys<IdT, T> withKeys = WithKeys.of(fn);
       if (representativeType != null) {
         withKeys = withKeys.withKeyType(representativeType);
       }
-      return in
-          .apply(withKeys)
-          .apply(Combine.<IdT, T, T>perKey(
+      PCollection<KV<IdT, T>> combined = in
+          .apply("KeyByRepresentativeValue", withKeys)
+          .apply("OneValuePerKey", Combine.<IdT, T, T>perKey(
               new Combine.BinaryCombineFn<T>() {
                 @Override
                 public T apply(T left, T right) {
                   return left;
                 }
-              }))
-          .apply(Values.<T>create());
+              }));
+        return combined.apply("KeepFirstPane", ParDo.of(new DoFn<KV<IdT, T>, T>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) {
+            // Only output the value if it's the first time it's been seen.
+            if (c.pane().isFirst()) {
+              c.output(c.element().getValue());
+            }
+          }
+        }));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc84fb5/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
index 17bbed6..b9810c1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
@@ -24,12 +24,25 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -85,9 +98,9 @@ public class DistinctTest {
     p.run();
   }
 
-  private static class Keys implements SerializableFunction<KV<String, String>, String> {
+  private static class Keys<T> implements SerializableFunction<KV<T, String>, T> {
     @Override
-    public String apply(KV<String, String> input) {
+    public T apply(KV<T, String> input) {
       return input.getKey();
     }
   }
@@ -118,11 +131,122 @@ public class DistinctTest {
     PCollection<KV<String, String>> input = p.apply(Create.of(strings));
 
     PCollection<KV<String, String>> output =
-        input.apply(Distinct.withRepresentativeValueFn(new Keys()));
+        input.apply(Distinct.withRepresentativeValueFn(new Keys<String>())
+            .withRepresentativeType(TypeDescriptor.of(String.class)));
 
 
     PAssert.that(output).satisfies(new Checker());
 
     p.run();
   }
+
+  @Rule
+  public TestPipeline windowedDistinctPipeline = TestPipeline.create();
+
+  @Test
+  @Category({ValidatesRunner.class, UsesTestStream.class})
+  public void testWindowedDistinct() {
+    Instant base = new Instant(0);
+    TestStream<String> values = TestStream.create(StringUtf8Coder.of())
+        .advanceWatermarkTo(base)
+        .addElements(
+            TimestampedValue.of("k1", base),
+            TimestampedValue.of("k2", base.plus(Duration.standardSeconds(10))),
+            TimestampedValue.of("k3", base.plus(Duration.standardSeconds(20))),
+            TimestampedValue.of("k1", base.plus(Duration.standardSeconds(30))),
+            TimestampedValue.of("k2", base.plus(Duration.standardSeconds(40))),
+            TimestampedValue.of("k3", base.plus(Duration.standardSeconds(50))),
+            TimestampedValue.of("k4", base.plus(Duration.standardSeconds(60))),
+            TimestampedValue.of("k5", base.plus(Duration.standardSeconds(70))),
+            TimestampedValue.of("k6", base.plus(Duration.standardSeconds(80))))
+        .advanceWatermarkToInfinity();
+
+    PCollection<String> distinctValues = windowedDistinctPipeline
+        .apply(values)
+        .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30))))
+        .apply(Distinct.<String>create());
+    PAssert.that(distinctValues)
+        .inWindow(new IntervalWindow(base, base.plus(Duration.standardSeconds(30))))
+        .containsInAnyOrder("k1", "k2", "k3");
+    PAssert.that(distinctValues)
+        .inWindow(new IntervalWindow(base.plus(
+            Duration.standardSeconds(30)), base.plus(Duration.standardSeconds(60))))
+        .containsInAnyOrder("k1", "k2", "k3");
+    PAssert.that(distinctValues)
+        .inWindow(new IntervalWindow(base.plus(
+            Duration.standardSeconds(60)), base.plus(Duration.standardSeconds(90))))
+        .containsInAnyOrder("k4", "k5", "k6");
+    windowedDistinctPipeline.run();
+  }
+
+  @Rule
+  public TestPipeline triggeredDistinctPipeline = TestPipeline.create();
+
+  @Test
+  @Category({ValidatesRunner.class, UsesTestStream.class})
+  public void testTriggeredDistinct() {
+    Instant base = new Instant(0);
+    TestStream<String> values = TestStream.create(StringUtf8Coder.of())
+        .advanceWatermarkTo(base)
+        .addElements(
+            TimestampedValue.of("k1", base),
+            TimestampedValue.of("k2", base.plus(Duration.standardSeconds(10))),
+            TimestampedValue.of("k3", base.plus(Duration.standardSeconds(20))))
+        .advanceProcessingTime(Duration.standardMinutes(1))
+        .addElements(
+            TimestampedValue.of("k1", base.plus(Duration.standardSeconds(30))),
+            TimestampedValue.of("k2", base.plus(Duration.standardSeconds(40))),
+            TimestampedValue.of("k3", base.plus(Duration.standardSeconds(50))))
+        .advanceWatermarkToInfinity();
+
+    PCollection<String> distinctValues = triggeredDistinctPipeline
+        .apply(values)
+        .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
+            .triggering(Repeatedly.forever(
+                AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
+                    Duration.standardSeconds(30))))
+            .withAllowedLateness(Duration.ZERO)
+            .accumulatingFiredPanes())
+        .apply(Distinct.<String>create());
+    PAssert.that(distinctValues).containsInAnyOrder("k1", "k2", "k3");
+    triggeredDistinctPipeline.run();
+  }
+
+  @Rule
+  public TestPipeline triggeredDistinctRepresentativePipeline = TestPipeline.create();
+
+  @Test
+  @Category({ValidatesRunner.class, UsesTestStream.class})
+  public void testTriggeredDistinctRepresentativeValues() {
+    Instant base = new Instant(0);
+    TestStream<KV<Integer, String>> values = TestStream.create(
+        KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
+        .advanceWatermarkTo(base)
+        .addElements(
+            TimestampedValue.of(KV.of(1, "k1"), base),
+            TimestampedValue.of(KV.of(2, "k2"), base.plus(Duration.standardSeconds(10))),
+            TimestampedValue.of(KV.of(3, "k3"), base.plus(Duration.standardSeconds(20))))
+        .advanceProcessingTime(Duration.standardMinutes(1))
+        .addElements(
+            TimestampedValue.of(KV.of(1, "k1"), base.plus(Duration.standardSeconds(30))),
+            TimestampedValue.of(KV.of(2, "k2"), base.plus(Duration.standardSeconds(40))),
+            TimestampedValue.of(KV.of(3, "k3"), base.plus(Duration.standardSeconds(50))))
+        .advanceWatermarkToInfinity();
+
+    PCollection<KV<Integer, String>> distinctValues = triggeredDistinctRepresentativePipeline
+        .apply(values)
+        .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(1)))
+            .triggering(Repeatedly.forever(
+                AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
+                    Duration.standardSeconds(30))))
+            .withAllowedLateness(Duration.ZERO)
+            .accumulatingFiredPanes())
+        .apply(Distinct.withRepresentativeValueFn(new Keys<Integer>())
+            .withRepresentativeType(TypeDescriptor.of(Integer.class)));
+
+
+    PAssert.that(distinctValues).containsInAnyOrder(
+        KV.of(1, "k1"), KV.of(2, "k2"), KV.of(3, "k3"));
+    triggeredDistinctRepresentativePipeline.run();
+  }
 }


[2/2] beam git commit: This closes #3165: Allow the Distinct transform to deduplicate elements across panes

Posted by ke...@apache.org.
This closes #3165: Allow the Distinct transform to deduplicate elements across panes


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9cdae6ca
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9cdae6ca
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9cdae6ca

Branch: refs/heads/master
Commit: 9cdae6caf4c466dcc012a10380da219c18b56472
Parents: 53c9bf4 1bc84fb
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jun 2 15:44:53 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 2 15:44:53 2017 -0700

----------------------------------------------------------------------
 .../runners/spark/SparkRunnerDebuggerTest.java  |   2 +-
 .../apache/beam/sdk/transforms/Distinct.java    |  80 +++++++++---
 .../beam/sdk/transforms/DistinctTest.java       | 130 ++++++++++++++++++-
 3 files changed, 188 insertions(+), 24 deletions(-)
----------------------------------------------------------------------