You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/27 04:07:18 UTC
[1/2] incubator-beam git commit: [BEAM-109] fix support for
FixedWindows and SlidingWindows in batch
Repository: incubator-beam
Updated Branches:
refs/heads/master 49d82baf1 -> 706fc5376
[BEAM-109] fix support for FixedWindows and SlidingWindows in batch
[BEAM-109] Better testing for FixedWindows and SlidingWindows
[BEAM-109] lower counts is unordered so better to compare entire result and not just iterator head
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/18242651
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/18242651
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/18242651
Branch: refs/heads/master
Commit: 182426516c5184e126888d3b465dc41a1bb312aa
Parents: 49d82ba
Author: Sela <an...@paypal.com>
Authored: Sat Mar 19 12:25:02 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Sat Mar 26 18:58:33 2016 -0700
----------------------------------------------------------------------
.../spark/translation/TransformTranslator.java | 22 ++++---
.../translation/MultiOutputWordCountTest.java | 19 +++++-
.../translation/WindowedWordCountTest.java | 63 ++++++++++++++++----
3 files changed, 82 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/18242651/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 0bd047c..7f72235 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -30,6 +30,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import com.google.api.client.util.Lists;
import com.google.api.client.util.Maps;
import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
import com.google.cloud.dataflow.sdk.coders.Coder;
@@ -79,6 +80,7 @@ import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
@@ -267,15 +269,21 @@ public final class TransformTranslator {
// Key has to bw windowed in order to group by window as well
JavaPairRDD<WindowedValue<K>, WindowedValue<KV<K, VI>>> inRddDuplicatedKeyPair =
- inRdd.mapToPair(
- new PairFunction<WindowedValue<KV<K, VI>>, WindowedValue<K>,
+ inRdd.flatMapToPair(
+ new PairFlatMapFunction<WindowedValue<KV<K, VI>>, WindowedValue<K>,
WindowedValue<KV<K, VI>>>() {
@Override
- public Tuple2<WindowedValue<K>,
- WindowedValue<KV<K, VI>>> call(WindowedValue<KV<K, VI>> kv) {
- WindowedValue<K> wk = WindowedValue.of(kv.getValue().getKey(),
- kv.getTimestamp(), kv.getWindows(), kv.getPane());
- return new Tuple2<>(wk, kv);
+ public Iterable<Tuple2<WindowedValue<K>,
+ WindowedValue<KV<K, VI>>>> call(WindowedValue<KV<K, VI>> kv) {
+ List<Tuple2<WindowedValue<K>,
+ WindowedValue<KV<K, VI>>>> tuple2s =
+ Lists.newArrayListWithCapacity(kv.getWindows().size());
+ for (BoundedWindow boundedWindow: kv.getWindows()) {
+ WindowedValue<K> wk = WindowedValue.of(kv.getValue().getKey(),
+ boundedWindow.maxTimestamp(), boundedWindow, kv.getPane());
+ tuple2s.add(new Tuple2<>(wk, kv));
+ }
+ return tuple2s;
}
});
//-- windowed coders
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/18242651/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index 8ab3798..974467f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -22,14 +22,18 @@ import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
import com.google.cloud.dataflow.sdk.transforms.*;
import com.google.cloud.dataflow.sdk.values.*;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.beam.runners.spark.EvaluationResult;
import org.apache.beam.runners.spark.SparkPipelineRunner;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Set;
+
public class MultiOutputWordCountTest {
private static final TupleTag<String> upper = new TupleTag<>();
@@ -37,6 +41,10 @@ public class MultiOutputWordCountTest {
private static final TupleTag<KV<String, Long>> lowerCnts = new TupleTag<>();
private static final TupleTag<KV<String, Long>> upperCnts = new TupleTag<>();
+ private static final Set<String> EXPECTED_LOWER_COUNTS =
+ ImmutableSet.of("are: 2", "some: 3", "words: 3", "more: 2", "to: 1", "count: 1", "and: 2",
+ "even: 1", "others: 1");
+
@Test
public void testRun() throws Exception {
Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
@@ -53,8 +61,8 @@ public class MultiOutputWordCountTest {
ApproximateUnique.<KV<String, Long>>globally(16));
EvaluationResult res = SparkPipelineRunner.create().run(p);
- Iterable<KV<String, Long>> actualLower = res.get(luc.get(lowerCnts));
- Assert.assertEquals("are", actualLower.iterator().next().getKey());
+ DataflowAssert.that(luc.get(lowerCnts).apply(ParDo.of(new FormatCountsFn())))
+ .containsInAnyOrder(EXPECTED_LOWER_COUNTS);
Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts));
Assert.assertEquals("Here", actualUpper.iterator().next().getKey());
Iterable<Long> actualUniqCount = res.get(unique);
@@ -134,4 +142,11 @@ public class MultiOutputWordCountTest {
return extractWordsFn.totalWords;
}
}
+
+ private static class FormatCountsFn extends DoFn<KV<String, Long>, String> {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element().getKey() + ": " + c.element().getValue());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/18242651/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
index 9f29a37..9fac9c6 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
@@ -24,6 +24,7 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.collect.ImmutableList;
@@ -39,30 +40,66 @@ import org.junit.Test;
public class WindowedWordCountTest {
private static final String[] WORDS_ARRAY = {
- "hi there", "hi", "hi sue bob",
- "hi sue", "", "bob hi"};
+ "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"};
private static final Long[] TIMESTAMPS_ARRAY = {
- 60000L, 60000L, 60000L,
- 120000L, 120000L, 120000L};
+ 60000L, 60000L, 60000L, 179000L, 179000L, 179000L};
private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
private static final List<Long> TIMESTAMPS = Arrays.asList(TIMESTAMPS_ARRAY);
- private static final List<String> EXPECTED_COUNT_SET =
- ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1",
- "hi: 2", "sue: 1", "bob: 1");
+
+ private static final List<String> EXPECTED_FIXED_SEPARATE_COUNT_SET =
+ ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1", "hi: 2", "sue: 1", "bob: 1");
+
+ @Test
+ public void testFixed() throws Exception {
+ Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+ PCollection<String> inputWords =
+ p.apply(Create.timestamped(WORDS, TIMESTAMPS)).setCoder(StringUtf8Coder.of());
+ PCollection<String> windowedWords =
+ inputWords.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
+
+ PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
+
+ DataflowAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SEPARATE_COUNT_SET);
+
+ EvaluationResult res = SparkPipelineRunner.create().run(p);
+ res.close();
+ }
+
+ private static final List<String> EXPECTED_FIXED_SAME_COUNT_SET =
+ ImmutableList.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
+
+ @Test
+ public void testFixed2() throws Exception {
+ Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+ PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS))
+ .setCoder(StringUtf8Coder.of());
+ PCollection<String> windowedWords = inputWords
+ .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))));
+
+ PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
+
+ DataflowAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SAME_COUNT_SET);
+
+ EvaluationResult res = SparkPipelineRunner.create().run(p);
+ res.close();
+ }
+
+ private static final List<String> EXPECTED_SLIDING_COUNT_SET =
+ ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1", "hi: 5", "there: 1", "sue: 2",
+ "bob: 2", "hi: 2", "sue: 1", "bob: 1");
@Test
- public void testRun() throws Exception {
- SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
- options.setRunner(SparkPipelineRunner.class);
+ public void testSliding() throws Exception {
Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS))
- .setCoder(StringUtf8Coder.of());
+ .setCoder(StringUtf8Coder.of());
PCollection<String> windowedWords = inputWords
- .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
+ .apply(Window.<String>into(SlidingWindows.of(Duration.standardMinutes(2))
+ .every(Duration.standardMinutes(1))));
PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
- DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
+ DataflowAssert.that(output).containsInAnyOrder(EXPECTED_SLIDING_COUNT_SET);
EvaluationResult res = SparkPipelineRunner.create().run(p);
res.close();
[2/2] incubator-beam git commit: [BEAM-109] Combine.PerKey ignores
grouping also by windows
Posted by am...@apache.org.
[BEAM-109] Combine.PerKey ignores grouping also by windows
This closes #63
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/706fc537
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/706fc537
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/706fc537
Branch: refs/heads/master
Commit: 706fc537645c64adf87f9003fc6d8e43b0ece843
Parents: 49d82ba 1824265
Author: Sela <an...@paypal.com>
Authored: Sat Mar 26 19:01:42 2016 -0700
Committer: Sela <an...@paypal.com>
Committed: Sat Mar 26 19:01:42 2016 -0700
----------------------------------------------------------------------
.../spark/translation/TransformTranslator.java | 22 ++++---
.../translation/MultiOutputWordCountTest.java | 19 +++++-
.../translation/WindowedWordCountTest.java | 63 ++++++++++++++++----
3 files changed, 82 insertions(+), 22 deletions(-)
----------------------------------------------------------------------