You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/25 18:17:02 UTC
[1/2] incubator-beam git commit: Encode bundle elements in the
DirectRunner
Repository: incubator-beam
Updated Branches:
refs/heads/master c03e3e926 -> f603d43e0
Encode bundle elements in the DirectRunner
This ensures that any changes that are caused when an element is encoded
and decoded is caught within the pipeline.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2ceaa3ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2ceaa3ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2ceaa3ef
Branch: refs/heads/master
Commit: 2ceaa3effa8a6d9de3753a05db9d1648e8eed576
Parents: c03e3e9
Author: Thomas Groh <tg...@google.com>
Authored: Tue Sep 20 11:43:40 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Oct 25 11:03:43 2016 -0700
----------------------------------------------------------------------
.../runners/direct/CloningBundleFactory.java | 98 ++++++++++
.../beam/runners/direct/DirectRunner.java | 5 +-
.../direct/ImmutableListBundleFactory.java | 4 +-
.../direct/CloningBundleFactoryTest.java | 177 +++++++++++++++++++
.../EncodabilityEnforcementFactoryTest.java | 6 +-
5 files changed, 285 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ceaa3ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
new file mode 100644
index 0000000..33241e3
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
+
+/**
+ * A {@link BundleFactory} where a created {@link UncommittedBundle} clones all elements added to it
+ * using the coder of the {@link PCollection}.
+ */
+class CloningBundleFactory implements BundleFactory {
+ private static final CloningBundleFactory INSTANCE = new CloningBundleFactory();
+
+ public static CloningBundleFactory create() {
+ return INSTANCE;
+ }
+
+ private final ImmutableListBundleFactory underlying;
+ private CloningBundleFactory() {
+ this.underlying = ImmutableListBundleFactory.create();
+ }
+
+ @Override
+ public <T> UncommittedBundle<T> createRootBundle() {
+ // The DirectRunner is responsible for these elements, but they need not be encodable.
+ return underlying.createRootBundle();
+ }
+
+ @Override
+ public <T> UncommittedBundle<T> createBundle(
+ PCollection<T> output) {
+ return new CloningBundle<>(underlying.createBundle(output));
+ }
+
+ @Override
+ public <K, T> UncommittedBundle<T> createKeyedBundle(
+ StructuralKey<K> key, PCollection<T> output) {
+ return new CloningBundle<>(underlying.createKeyedBundle(key, output));
+ }
+
+ private static class CloningBundle<T> implements UncommittedBundle<T> {
+ private final UncommittedBundle<T> underlying;
+ private final Coder<T> coder;
+
+ private CloningBundle(UncommittedBundle<T> underlying) {
+ this.underlying = underlying;
+ this.coder = underlying.getPCollection().getCoder();
+ }
+
+ @Override
+ public PCollection<T> getPCollection() {
+ return underlying.getPCollection();
+ }
+
+ @Override
+ public UncommittedBundle<T> add(WindowedValue<T> element) {
+ try {
+ // Use the cloned value to ensure that if the coder behaves poorly (e.g. a NoOpCoder that
+ // does not expect to be used) that is reflected in the values given to downstream
+ // transforms
+ WindowedValue<T> clone = element.withValue(CoderUtils.clone(coder, element.getValue()));
+ underlying.add(clone);
+ } catch (CoderException e) {
+ throw UserCodeException.wrap(e);
+ }
+ return this;
+ }
+
+ @Override
+ public CommittedBundle<T> commit(Instant synchronizedProcessingTime) {
+ return underlying.commit(synchronizedProcessingTime);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ceaa3ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index b79a42f..e02c8a6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -319,7 +319,10 @@ public class DirectRunner
}
private BundleFactory createBundleFactory(DirectOptions pipelineOptions) {
- BundleFactory bundleFactory = ImmutableListBundleFactory.create();
+ BundleFactory bundleFactory =
+ pipelineOptions.isEnforceEncodability()
+ ? CloningBundleFactory.create()
+ : ImmutableListBundleFactory.create();
if (pipelineOptions.isEnforceImmutability()) {
bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ceaa3ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
index db92542..abc6dd8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
@@ -32,8 +32,10 @@ import org.joda.time.Instant;
* A factory that produces bundles that perform no additional validation.
*/
class ImmutableListBundleFactory implements BundleFactory {
+ private static final ImmutableListBundleFactory FACTORY = new ImmutableListBundleFactory();
+
public static ImmutableListBundleFactory create() {
- return new ImmutableListBundleFactory();
+ return FACTORY;
}
private ImmutableListBundleFactory() {}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ceaa3ef/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
new file mode 100644
index 0000000..03846d9
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.isA;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.EncodabilityEnforcementFactoryTest.Record;
+import org.apache.beam.runners.direct.EncodabilityEnforcementFactoryTest.RecordNoDecodeCoder;
+import org.apache.beam.runners.direct.EncodabilityEnforcementFactoryTest.RecordNoEncodeCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link CloningBundleFactory}.
+ */
+@RunWith(JUnit4.class)
+public class CloningBundleFactoryTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+ private CloningBundleFactory factory = CloningBundleFactory.create();
+
+ @Test
+ public void rootBundleSucceedsIgnoresCoder() {
+ WindowedValue<Record> one = WindowedValue.valueInGlobalWindow(new Record());
+ WindowedValue<Record> two = WindowedValue.valueInGlobalWindow(new Record());
+ CommittedBundle<Record> root =
+ factory.<Record>createRootBundle().add(one).add(two).commit(Instant.now());
+
+ assertThat(root.getElements(), containsInAnyOrder(one, two));
+ }
+
+ @Test
+ public void bundleWorkingCoderSucceedsClonesOutput() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Integer> created = p.apply(Create.of(1, 3).withCoder(VarIntCoder.of()));
+ PCollection<KV<String, Integer>> kvs =
+ created
+ .apply(WithKeys.<String, Integer>of("foo"))
+ .setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+ WindowedValue<KV<String, Integer>> fooOne = WindowedValue.valueInGlobalWindow(KV.of("foo", 1));
+ WindowedValue<KV<String, Integer>> fooThree =
+ WindowedValue.valueInGlobalWindow(KV.of("foo", 3));
+ CommittedBundle<KV<String, Integer>> bundle =
+ factory.createBundle(kvs).add(fooOne).add(fooThree).commit(Instant.now());
+
+ assertThat(bundle.getElements(), containsInAnyOrder(fooOne, fooThree));
+ assertThat(
+ bundle.getElements(), not(containsInAnyOrder(theInstance(fooOne), theInstance(fooThree))));
+ for (WindowedValue<KV<String, Integer>> foo : bundle.getElements()) {
+ assertThat(
+ foo.getValue(),
+ not(anyOf(theInstance(fooOne.getValue()), theInstance(fooThree.getValue()))));
+ }
+ assertThat(bundle.getPCollection(), equalTo(kvs));
+ }
+
+ @Test
+ public void keyedBundleWorkingCoderSucceedsClonesOutput() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Integer> created = p.apply(Create.of(1, 3).withCoder(VarIntCoder.of()));
+
+ PCollection<KV<String, Iterable<Integer>>> keyed =
+ created
+ .apply(WithKeys.<String, Integer>of("foo"))
+ .setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+ .apply(GroupByKey.<String, Integer>create());
+ WindowedValue<KV<String, Iterable<Integer>>> foos =
+ WindowedValue.valueInGlobalWindow(
+ KV.<String, Iterable<Integer>>of("foo", ImmutableList.of(1, 3)));
+ CommittedBundle<KV<String, Iterable<Integer>>> keyedBundle =
+ factory
+ .createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), keyed)
+ .add(foos)
+ .commit(Instant.now());
+
+ assertThat(keyedBundle.getElements(), containsInAnyOrder(foos));
+ assertThat(
+ Iterables.getOnlyElement(keyedBundle.getElements()).getValue(),
+ not(theInstance(foos.getValue())));
+ assertThat(keyedBundle.getPCollection(), equalTo(keyed));
+ assertThat(
+ keyedBundle.getKey(),
+ Matchers.<StructuralKey<?>>equalTo(StructuralKey.of("foo", StringUtf8Coder.of())));
+ }
+
+ @Test
+ public void bundleEncodeFailsAddFails() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoEncodeCoder()));
+ UncommittedBundle<Record> bundle = factory.createBundle(pc);
+
+ thrown.expect(UserCodeException.class);
+ thrown.expectCause(isA(CoderException.class));
+ thrown.expectMessage("Encode not allowed");
+ bundle.add(WindowedValue.valueInGlobalWindow(new Record()));
+ }
+
+ @Test
+ public void bundleDecodeFailsAddFails() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoDecodeCoder()));
+ UncommittedBundle<Record> bundle = factory.createBundle(pc);
+
+ thrown.expect(UserCodeException.class);
+ thrown.expectCause(isA(CoderException.class));
+ thrown.expectMessage("Decode not allowed");
+ bundle.add(WindowedValue.valueInGlobalWindow(new Record()));
+ }
+
+ @Test
+ public void keyedBundleEncodeFailsAddFails() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoEncodeCoder()));
+ UncommittedBundle<Record> bundle =
+ factory.createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), pc);
+
+ thrown.expect(UserCodeException.class);
+ thrown.expectCause(isA(CoderException.class));
+ thrown.expectMessage("Encode not allowed");
+ bundle.add(WindowedValue.valueInGlobalWindow(new Record()));
+ }
+
+ @Test
+ public void keyedBundleDecodeFailsAddFails() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoDecodeCoder()));
+ UncommittedBundle<Record> bundle =
+ factory.createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), pc);
+
+ thrown.expect(UserCodeException.class);
+ thrown.expectCause(isA(CoderException.class));
+ thrown.expectMessage("Decode not allowed");
+ bundle.add(WindowedValue.valueInGlobalWindow(new Record()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ceaa3ef/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
index e62bf01..e6bdbd0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
@@ -208,8 +208,8 @@ public class EncodabilityEnforcementFactoryTest {
Collections.<CommittedBundle<?>>emptyList());
}
- private static class Record {}
- private static class RecordNoEncodeCoder extends AtomicCoder<Record> {
+ static class Record {}
+ static class RecordNoEncodeCoder extends AtomicCoder<Record> {
@Override
public void encode(
@@ -228,7 +228,7 @@ public class EncodabilityEnforcementFactoryTest {
}
}
- private static class RecordNoDecodeCoder extends AtomicCoder<Record> {
+ static class RecordNoDecodeCoder extends AtomicCoder<Record> {
@Override
public void encode(
Record value,
[2/2] incubator-beam git commit: This closes #1095
Posted by ke...@apache.org.
This closes #1095
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f603d43e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f603d43e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f603d43e
Branch: refs/heads/master
Commit: f603d43e043d383ceb00c4d786459f01d9983586
Parents: c03e3e9 2ceaa3e
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 25 11:04:13 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Oct 25 11:04:13 2016 -0700
----------------------------------------------------------------------
.../runners/direct/CloningBundleFactory.java | 98 ++++++++++
.../beam/runners/direct/DirectRunner.java | 5 +-
.../direct/ImmutableListBundleFactory.java | 4 +-
.../direct/CloningBundleFactoryTest.java | 177 +++++++++++++++++++
.../EncodabilityEnforcementFactoryTest.java | 6 +-
5 files changed, 285 insertions(+), 5 deletions(-)
----------------------------------------------------------------------