You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/02 22:45:50 UTC
[1/2] beam git commit: Allow the Distinct transform to deduplicate
elements across panes
Repository: beam
Updated Branches:
refs/heads/master 53c9bf4cd -> 9cdae6caf
Allow the Distinct transform to deduplicate elements across panes
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1bc84fb5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1bc84fb5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1bc84fb5
Branch: refs/heads/master
Commit: 1bc84fb5ff4ca087c97da45247f1e445eadc48de
Parents: 53c9bf4
Author: Reuven Lax <re...@google.com>
Authored: Tue May 16 12:12:01 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 2 15:42:53 2017 -0700
----------------------------------------------------------------------
.../runners/spark/SparkRunnerDebuggerTest.java | 2 +-
.../apache/beam/sdk/transforms/Distinct.java | 80 +++++++++---
.../beam/sdk/transforms/DistinctTest.java | 130 ++++++++++++++++++-
3 files changed, 188 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc84fb5/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index 9009751..64ff98c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -142,7 +142,7 @@ public class SparkRunnerDebuggerTest {
+ "_.mapPartitions(new org.apache.beam.sdk.transforms.Distinct$2())\n"
+ "_.groupByKey()\n"
+ "_.map(new org.apache.beam.sdk.transforms.Combine$IterableCombineFn())\n"
- + "_.mapPartitions(new org.apache.beam.sdk.transforms.Keys$1())\n"
+ + "_.mapPartitions(new org.apache.beam.sdk.transforms.Distinct$3())\n"
+ "_.mapPartitions(new org.apache.beam.sdk.transforms.WithKeys$2())\n"
+ "_.<org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Write>";
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc84fb5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
index 2d08cee..d751dbe 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
@@ -17,9 +17,15 @@
*/
package org.apache.beam.sdk.transforms;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* {@code Distinct<T>} takes a {@code PCollection<T>} and
@@ -59,6 +65,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
*/
public class Distinct<T> extends PTransform<PCollection<T>,
PCollection<T>> {
+ private static final Logger LOG = LoggerFactory.getLogger(Distinct.class);
+
/**
* Returns a {@code Distinct<T>} {@code PTransform}.
*
@@ -66,7 +74,7 @@ public class Distinct<T> extends PTransform<PCollection<T>,
* {@code PCollection}s
*/
public static <T> Distinct<T> create() {
- return new Distinct<T>();
+ return new Distinct<>();
}
/**
@@ -78,26 +86,48 @@ public class Distinct<T> extends PTransform<PCollection<T>,
*/
public static <T, IdT> WithRepresentativeValues<T, IdT> withRepresentativeValueFn(
SerializableFunction<T, IdT> fn) {
- return new WithRepresentativeValues<T, IdT>(fn, null);
+ return new WithRepresentativeValues<>(fn, null);
+ }
+
+ private static <T, W extends BoundedWindow> void validateWindowStrategy(
+ WindowingStrategy<T, W> strategy) {
+ if (!strategy.getWindowFn().isNonMerging()
+ && (!strategy.getTrigger().getClass().equals(DefaultTrigger.class)
+ || strategy.getAllowedLateness().isLongerThan(Duration.ZERO))) {
+ throw new UnsupportedOperationException(String.format(
+ "%s does not support non-merging windowing strategies, except when using the default "
+ + "trigger and zero allowed lateness.", Distinct.class.getSimpleName()));
+ }
}
@Override
public PCollection<T> expand(PCollection<T> in) {
- return in
- .apply("CreateIndex", MapElements.via(new SimpleFunction<T, KV<T, Void>>() {
- @Override
- public KV<T, Void> apply(T element) {
- return KV.of(element, (Void) null);
- }
- }))
- .apply(Combine.<T, Void>perKey(
- new SerializableFunction<Iterable<Void>, Void>() {
+ validateWindowStrategy(in.getWindowingStrategy());
+ PCollection<KV<T, Void>> combined =
+ in.apply("KeyByElement", MapElements.via(
+ new SimpleFunction<T, KV<T, Void>>() {
@Override
- public Void apply(Iterable<Void> iter) {
- return null; // ignore input
- }
+ public KV<T, Void> apply(T element) {
+ return KV.of(element, (Void) null);
+ }
}))
- .apply(Keys.<T>create());
+ .apply("DropValues",
+ Combine.<T, Void>perKey(
+ new SerializableFunction<Iterable<Void>, Void>() {
+ @Override
+ public Void apply(Iterable<Void> iter) {
+ return null; // ignore input
+ }
+ }));
+ return combined.apply("ExtractFirstKey", ParDo.of(new DoFn<KV<T, Void>, T>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ if (c.pane().isFirst()) {
+ // Only output the key if it's the first time it's been seen.
+ c.output(c.element().getKey());
+ }
+ }
+ }));
}
/**
@@ -120,22 +150,32 @@ public class Distinct<T> extends PTransform<PCollection<T>,
this.representativeType = representativeType;
}
+
@Override
public PCollection<T> expand(PCollection<T> in) {
+ validateWindowStrategy(in.getWindowingStrategy());
WithKeys<IdT, T> withKeys = WithKeys.of(fn);
if (representativeType != null) {
withKeys = withKeys.withKeyType(representativeType);
}
- return in
- .apply(withKeys)
- .apply(Combine.<IdT, T, T>perKey(
+ PCollection<KV<IdT, T>> combined = in
+ .apply("KeyByRepresentativeValue", withKeys)
+ .apply("OneValuePerKey", Combine.<IdT, T, T>perKey(
new Combine.BinaryCombineFn<T>() {
@Override
public T apply(T left, T right) {
return left;
}
- }))
- .apply(Values.<T>create());
+ }));
+ return combined.apply("KeepFirstPane", ParDo.of(new DoFn<KV<IdT, T>, T>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ // Only output the value if it's the first time it's been seen.
+ if (c.pane().isFirst()) {
+ c.output(c.element().getValue());
+ }
+ }
+ }));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc84fb5/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
index 17bbed6..b9810c1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
@@ -24,12 +24,25 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -85,9 +98,9 @@ public class DistinctTest {
p.run();
}
- private static class Keys implements SerializableFunction<KV<String, String>, String> {
+ private static class Keys<T> implements SerializableFunction<KV<T, String>, T> {
@Override
- public String apply(KV<String, String> input) {
+ public T apply(KV<T, String> input) {
return input.getKey();
}
}
@@ -118,11 +131,122 @@ public class DistinctTest {
PCollection<KV<String, String>> input = p.apply(Create.of(strings));
PCollection<KV<String, String>> output =
- input.apply(Distinct.withRepresentativeValueFn(new Keys()));
+ input.apply(Distinct.withRepresentativeValueFn(new Keys<String>())
+ .withRepresentativeType(TypeDescriptor.of(String.class)));
PAssert.that(output).satisfies(new Checker());
p.run();
}
+
+ @Rule
+ public TestPipeline windowedDistinctPipeline = TestPipeline.create();
+
+ @Test
+ @Category({ValidatesRunner.class, UsesTestStream.class})
+ public void testWindowedDistinct() {
+ Instant base = new Instant(0);
+ TestStream<String> values = TestStream.create(StringUtf8Coder.of())
+ .advanceWatermarkTo(base)
+ .addElements(
+ TimestampedValue.of("k1", base),
+ TimestampedValue.of("k2", base.plus(Duration.standardSeconds(10))),
+ TimestampedValue.of("k3", base.plus(Duration.standardSeconds(20))),
+ TimestampedValue.of("k1", base.plus(Duration.standardSeconds(30))),
+ TimestampedValue.of("k2", base.plus(Duration.standardSeconds(40))),
+ TimestampedValue.of("k3", base.plus(Duration.standardSeconds(50))),
+ TimestampedValue.of("k4", base.plus(Duration.standardSeconds(60))),
+ TimestampedValue.of("k5", base.plus(Duration.standardSeconds(70))),
+ TimestampedValue.of("k6", base.plus(Duration.standardSeconds(80))))
+ .advanceWatermarkToInfinity();
+
+ PCollection<String> distinctValues = windowedDistinctPipeline
+ .apply(values)
+ .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30))))
+ .apply(Distinct.<String>create());
+ PAssert.that(distinctValues)
+ .inWindow(new IntervalWindow(base, base.plus(Duration.standardSeconds(30))))
+ .containsInAnyOrder("k1", "k2", "k3");
+ PAssert.that(distinctValues)
+ .inWindow(new IntervalWindow(base.plus(
+ Duration.standardSeconds(30)), base.plus(Duration.standardSeconds(60))))
+ .containsInAnyOrder("k1", "k2", "k3");
+ PAssert.that(distinctValues)
+ .inWindow(new IntervalWindow(base.plus(
+ Duration.standardSeconds(60)), base.plus(Duration.standardSeconds(90))))
+ .containsInAnyOrder("k4", "k5", "k6");
+ windowedDistinctPipeline.run();
+ }
+
+ @Rule
+ public TestPipeline triggeredDistinctPipeline = TestPipeline.create();
+
+ @Test
+ @Category({ValidatesRunner.class, UsesTestStream.class})
+ public void testTriggeredDistinct() {
+ Instant base = new Instant(0);
+ TestStream<String> values = TestStream.create(StringUtf8Coder.of())
+ .advanceWatermarkTo(base)
+ .addElements(
+ TimestampedValue.of("k1", base),
+ TimestampedValue.of("k2", base.plus(Duration.standardSeconds(10))),
+ TimestampedValue.of("k3", base.plus(Duration.standardSeconds(20))))
+ .advanceProcessingTime(Duration.standardMinutes(1))
+ .addElements(
+ TimestampedValue.of("k1", base.plus(Duration.standardSeconds(30))),
+ TimestampedValue.of("k2", base.plus(Duration.standardSeconds(40))),
+ TimestampedValue.of("k3", base.plus(Duration.standardSeconds(50))))
+ .advanceWatermarkToInfinity();
+
+ PCollection<String> distinctValues = triggeredDistinctPipeline
+ .apply(values)
+ .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
+ .triggering(Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
+ Duration.standardSeconds(30))))
+ .withAllowedLateness(Duration.ZERO)
+ .accumulatingFiredPanes())
+ .apply(Distinct.<String>create());
+ PAssert.that(distinctValues).containsInAnyOrder("k1", "k2", "k3");
+ triggeredDistinctPipeline.run();
+ }
+
+ @Rule
+ public TestPipeline triggeredDistinctRepresentativePipeline = TestPipeline.create();
+
+ @Test
+ @Category({ValidatesRunner.class, UsesTestStream.class})
+ public void testTriggeredDistinctRepresentativeValues() {
+ Instant base = new Instant(0);
+ TestStream<KV<Integer, String>> values = TestStream.create(
+ KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
+ .advanceWatermarkTo(base)
+ .addElements(
+ TimestampedValue.of(KV.of(1, "k1"), base),
+ TimestampedValue.of(KV.of(2, "k2"), base.plus(Duration.standardSeconds(10))),
+ TimestampedValue.of(KV.of(3, "k3"), base.plus(Duration.standardSeconds(20))))
+ .advanceProcessingTime(Duration.standardMinutes(1))
+ .addElements(
+ TimestampedValue.of(KV.of(1, "k1"), base.plus(Duration.standardSeconds(30))),
+ TimestampedValue.of(KV.of(2, "k2"), base.plus(Duration.standardSeconds(40))),
+ TimestampedValue.of(KV.of(3, "k3"), base.plus(Duration.standardSeconds(50))))
+ .advanceWatermarkToInfinity();
+
+ PCollection<KV<Integer, String>> distinctValues = triggeredDistinctRepresentativePipeline
+ .apply(values)
+ .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(1)))
+ .triggering(Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
+ Duration.standardSeconds(30))))
+ .withAllowedLateness(Duration.ZERO)
+ .accumulatingFiredPanes())
+ .apply(Distinct.withRepresentativeValueFn(new Keys<Integer>())
+ .withRepresentativeType(TypeDescriptor.of(Integer.class)));
+
+
+ PAssert.that(distinctValues).containsInAnyOrder(
+ KV.of(1, "k1"), KV.of(2, "k2"), KV.of(3, "k3"));
+ triggeredDistinctRepresentativePipeline.run();
+ }
}
[2/2] beam git commit: This closes #3165: Allow the Distinct
transform to deduplicate elements across panes
Posted by ke...@apache.org.
This closes #3165: Allow the Distinct transform to deduplicate elements across panes
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9cdae6ca
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9cdae6ca
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9cdae6ca
Branch: refs/heads/master
Commit: 9cdae6caf4c466dcc012a10380da219c18b56472
Parents: 53c9bf4 1bc84fb
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jun 2 15:44:53 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 2 15:44:53 2017 -0700
----------------------------------------------------------------------
.../runners/spark/SparkRunnerDebuggerTest.java | 2 +-
.../apache/beam/sdk/transforms/Distinct.java | 80 +++++++++---
.../beam/sdk/transforms/DistinctTest.java | 130 ++++++++++++++++++-
3 files changed, 188 insertions(+), 24 deletions(-)
----------------------------------------------------------------------