You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/17 19:40:23 UTC
[1/2] incubator-beam git commit: Fix SplittableParDoTest
Repository: incubator-beam
Updated Branches:
refs/heads/master 5747951ff -> e849d95d1
Fix SplittableParDoTest
This required fixing GBKIntoKeyedWorkItems to properly set the coder on
the primitive, and updating the assertions to match the actual (and
correct) behavior.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/577d04ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/577d04ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/577d04ab
Branch: refs/heads/master
Commit: 577d04ab0799b18d5c2c88e2250859678f589968
Parents: b8e6eea
Author: bchambers <bc...@google.com>
Authored: Mon Oct 17 12:35:03 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Mon Oct 17 12:35:03 2016 -0700
----------------------------------------------------------------------
.../beam/runners/core/GBKIntoKeyedWorkItems.java | 17 ++++++++++++++++-
.../beam/runners/core/SplittableParDoTest.java | 10 +++++++---
2 files changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/577d04ab/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
index ca4d681..304e349 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
@@ -17,10 +17,15 @@
*/
package org.apache.beam.runners.core;
+import static com.google.common.base.Preconditions.checkArgument;
+
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItemCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -34,7 +39,17 @@ public class GBKIntoKeyedWorkItems<KeyT, InputT>
extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
@Override
public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) {
- return PCollection.createPrimitiveOutputInternal(
+ checkArgument(input.getCoder() instanceof KvCoder,
+ "Expected input coder to be KvCoder, but was %s",
+ input.getCoder().getClass().getSimpleName());
+
+ KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder();
+ Coder<KeyedWorkItem<KeyT, InputT>> coder = KeyedWorkItemCoder.of(
+ kvCoder.getKeyCoder(), kvCoder.getValueCoder(),
+ input.getWindowingStrategy().getWindowFn().windowCoder());
+ PCollection<KeyedWorkItem<KeyT, InputT>> collection = PCollection.createPrimitiveOutputInternal(
input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ collection.setCoder((Coder) coder);
+ return collection;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/577d04ab/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index a76c4da..b7cdc64 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -128,12 +128,14 @@ public class SplittableParDoTest {
Pipeline pipeline = TestPipeline.create();
DoFn<Integer, String> boundedFn = new BoundedFakeFn();
assertEquals(
+ "Applying a bounded SDF to a bounded collection produces a bounded collection",
PCollection.IsBounded.BOUNDED,
makeBoundedCollection(pipeline)
.apply("bounded to bounded", new SplittableParDo<>(boundedFn))
.isBounded());
assertEquals(
- PCollection.IsBounded.BOUNDED,
+ "Applying a bounded SDF to an unbounded collection produces an unbounded collection",
+ PCollection.IsBounded.UNBOUNDED,
makeUnboundedCollection(pipeline)
.apply("bounded to unbounded", new SplittableParDo<>(boundedFn))
.isBounded());
@@ -145,12 +147,14 @@ public class SplittableParDoTest {
Pipeline pipeline = TestPipeline.create();
DoFn<Integer, String> unboundedFn = new UnboundedFakeFn();
assertEquals(
- PCollection.IsBounded.BOUNDED,
+ "Applying an unbounded SDF to a bounded collection produces a bounded collection",
+ PCollection.IsBounded.UNBOUNDED,
makeBoundedCollection(pipeline)
.apply("unbounded to bounded", new SplittableParDo<>(unboundedFn))
.isBounded());
assertEquals(
- PCollection.IsBounded.BOUNDED,
+ "Applying an unbounded SDF to an unbounded collection produces an unbounded collection",
+ PCollection.IsBounded.UNBOUNDED,
makeUnboundedCollection(pipeline)
.apply("unbounded to unbounded", new SplittableParDo<>(unboundedFn))
.isBounded());
[2/2] incubator-beam git commit: This closes #1117
Posted by ke...@apache.org.
This closes #1117
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e849d95d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e849d95d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e849d95d
Branch: refs/heads/master
Commit: e849d95d1b5175c4e758345be278644a38769910
Parents: 5747951 577d04a
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 17 12:40:11 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 12:40:11 2016 -0700
----------------------------------------------------------------------
.../beam/runners/core/GBKIntoKeyedWorkItems.java | 17 ++++++++++++++++-
.../beam/runners/core/SplittableParDoTest.java | 10 +++++++---
2 files changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------