You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/04/05 00:58:31 UTC
[2/3] incubator-beam git commit: Add BundleFactory,
ImmutabilityCheckingBundleFactory
Add BundleFactory, ImmutabilityCheckingBundleFactory
This allows checks to be made on the contents of bundles.
ImmutabilityCheckingBundleFactory produces bundles that ensure that
elements output to a bundle are not modified after being output.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/334ab99a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/334ab99a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/334ab99a
Branch: refs/heads/master
Commit: 334ab99ab39b7f0632848b789e2c0af1782b11c0
Parents: ac314ee
Author: Thomas Groh <tg...@google.com>
Authored: Thu Mar 17 17:39:45 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Mon Apr 4 15:44:26 2016 -0700
----------------------------------------------------------------------
.../sdk/runners/inprocess/BundleFactory.java | 50 +++++
.../ExecutorServiceParallelExecutor.java | 5 +-
.../ImmutabilityCheckingBundleFactory.java | 131 +++++++++++
.../inprocess/InProcessBundleFactory.java | 157 +++++++++++++
.../inprocess/InProcessEvaluationContext.java | 18 +-
.../inprocess/InProcessPipelineRunner.java | 5 +
.../BoundedReadEvaluatorFactoryTest.java | 21 +-
.../inprocess/FlattenEvaluatorFactoryTest.java | 11 +-
.../GroupByKeyEvaluatorFactoryTest.java | 10 +-
.../ImmutabilityCheckingBundleFactoryTest.java | 220 +++++++++++++++++++
.../inprocess/InMemoryWatermarkManagerTest.java | 49 +++--
.../inprocess/InProcessBundleFactoryTest.java | 197 +++++++++++++++++
.../InProcessEvaluationContextTest.java | 11 +-
.../ParDoMultiEvaluatorFactoryTest.java | 95 ++++----
.../ParDoSingleEvaluatorFactoryTest.java | 129 ++++++-----
.../inprocess/TransformExecutorTest.java | 10 +-
.../UnboundedReadEvaluatorFactoryTest.java | 10 +-
.../inprocess/ViewEvaluatorFactoryTest.java | 5 +-
18 files changed, 980 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BundleFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BundleFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BundleFactory.java
new file mode 100644
index 0000000..cb8a369
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BundleFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+/**
+ * A factory that creates {@link UncommittedBundle UncommittedBundles}.
+ */
+public interface BundleFactory {
+ /**
+ * Create an {@link UncommittedBundle} from an empty input. Elements added to the bundle belong to
+ * the {@code output} {@link PCollection}.
+ */
+ public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output);
+
+ /**
+ * Create an {@link UncommittedBundle} from the specified input. Elements added to the bundle
+ * belong to the {@code output} {@link PCollection}.
+ */
+ public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output);
+
+ /**
+ * Create an {@link UncommittedBundle} with the specified keys at the specified step. For use by
+ * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. Elements added to the bundle
+ * belong to the {@code output} {@link PCollection}.
+ */
+ public <T> UncommittedBundle<T> createKeyedBundle(
+ CommittedBundle<?> input, Object key, PCollection<T> output);
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
index 628f107..9af6f97 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
@@ -374,8 +374,9 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery);
@SuppressWarnings({"unchecked", "rawtypes"})
CommittedBundle<?> bundle =
- InProcessBundle.<KeyedWorkItem<Object, Object>>keyed(
- (PCollection) transform.getInput(), keyTimers.getKey())
+ evaluationContext
+ .createKeyedBundle(
+ null, keyTimers.getKey(), (PCollection) transform.getInput())
.add(WindowedValue.valueInEmptyWindows(work))
.commit(Instant.now());
scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
new file mode 100644
index 0000000..44670e8
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.client.util.Throwables;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
+import com.google.cloud.dataflow.sdk.util.MutationDetector;
+import com.google.cloud.dataflow.sdk.util.MutationDetectors;
+import com.google.cloud.dataflow.sdk.util.SerializableUtils;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+
+import org.joda.time.Instant;
+
+/**
+ * A {@link BundleFactory} that ensures that elements added to it are not mutated after being
+ * output. Immutability checks are enforced at the time {@link UncommittedBundle#commit(Instant)} is
+ * called, checking the value at that time against the value at the time the element was added. All
+ * elements added to the bundle will be encoded by the {@link Coder} of the underlying
+ * {@link PCollection}.
+ *
+ * <p>This catches errors during the execution of a {@link DoFn} caused by modifying an element
+ * after it is added to an output {@link PCollection}.
+ */
+class ImmutabilityCheckingBundleFactory implements BundleFactory {
+ /**
+ * Create a new {@link ImmutabilityCheckingBundleFactory} that uses the underlying
+ * {@link BundleFactory} to create the output bundle.
+ */
+ public static ImmutabilityCheckingBundleFactory create(BundleFactory underlying) {
+ return new ImmutabilityCheckingBundleFactory(underlying);
+ }
+
+ private final BundleFactory underlying;
+
+ private ImmutabilityCheckingBundleFactory(BundleFactory underlying) {
+ this.underlying = checkNotNull(underlying);
+ }
+
+ @Override
+ public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
+ return new ImmutabilityEnforcingBundle<>(underlying.createRootBundle(output));
+ }
+
+ @Override
+ public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
+ return new ImmutabilityEnforcingBundle<>(underlying.createBundle(input, output));
+ }
+
+ @Override
+ public <T> UncommittedBundle<T> createKeyedBundle(
+ CommittedBundle<?> input, Object key, PCollection<T> output) {
+ return new ImmutabilityEnforcingBundle<>(underlying.createKeyedBundle(input, key, output));
+ }
+
+ private static class ImmutabilityEnforcingBundle<T> implements UncommittedBundle<T> {
+ private final UncommittedBundle<T> underlying;
+ private final SetMultimap<WindowedValue<T>, MutationDetector> mutationDetectors;
+ private Coder<T> coder;
+
+ public ImmutabilityEnforcingBundle(UncommittedBundle<T> underlying) {
+ this.underlying = underlying;
+ mutationDetectors = HashMultimap.create();
+ coder = SerializableUtils.clone(getPCollection().getCoder());
+ }
+
+ @Override
+ public PCollection<T> getPCollection() {
+ return underlying.getPCollection();
+ }
+
+ @Override
+ public UncommittedBundle<T> add(WindowedValue<T> element) {
+ try {
+ mutationDetectors.put(
+ element, MutationDetectors.forValueWithCoder(element.getValue(), coder));
+ } catch (CoderException e) {
+ throw Throwables.propagate(e);
+ }
+ underlying.add(element);
+ return this;
+ }
+
+ @Override
+ public CommittedBundle<T> commit(Instant synchronizedProcessingTime) {
+ for (MutationDetector detector : mutationDetectors.values()) {
+ try {
+ detector.verifyUnmodified();
+ } catch (IllegalMutationException exn) {
+ throw UserCodeException.wrap(
+ new IllegalMutationException(
+ String.format(
+ "PTransform %s mutated value %s after it was output (new value was %s)."
+ + " Values must not be mutated in any way after being output.",
+ underlying.getPCollection().getProducingTransformInternal().getFullName(),
+ exn.getSavedValue(),
+ exn.getNewValue()),
+ exn.getSavedValue(),
+ exn.getNewValue(),
+ exn));
+ }
+ }
+ return underlying.commit(synchronizedProcessingTime);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java
new file mode 100644
index 0000000..7ca1b60
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+
+/**
+ * A factory that produces bundles that perform no additional validation.
+ */
+class InProcessBundleFactory implements BundleFactory {
+ public static InProcessBundleFactory create() {
+ return new InProcessBundleFactory();
+ }
+
+ private InProcessBundleFactory() {}
+
+ @Override
+ public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
+ return InProcessBundle.unkeyed(output);
+ }
+
+ @Override
+ public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
+ return input.isKeyed()
+ ? InProcessBundle.keyed(output, input.getKey())
+ : InProcessBundle.unkeyed(output);
+ }
+
+ @Override
+ public <T> UncommittedBundle<T> createKeyedBundle(
+ CommittedBundle<?> input, Object key, PCollection<T> output) {
+ return InProcessBundle.keyed(output, key);
+ }
+
+ /**
+ * A {@link UncommittedBundle} that buffers elements in memory.
+ */
+ private static final class InProcessBundle<T> implements UncommittedBundle<T> {
+ private final PCollection<T> pcollection;
+ private final boolean keyed;
+ private final Object key;
+ private boolean committed = false;
+ private ImmutableList.Builder<WindowedValue<T>> elements;
+
+ /**
+ * Create a new {@link InProcessBundle} for the specified {@link PCollection} without a key.
+ */
+ public static <T> InProcessBundle<T> unkeyed(PCollection<T> pcollection) {
+ return new InProcessBundle<T>(pcollection, false, null);
+ }
+
+ /**
+ * Create a new {@link InProcessBundle} for the specified {@link PCollection} with the specified
+ * key.
+ *
+ * <p>See {@link CommittedBundle#getKey()} and {@link CommittedBundle#isKeyed()} for more
+ * information.
+ */
+ public static <T> InProcessBundle<T> keyed(PCollection<T> pcollection, Object key) {
+ return new InProcessBundle<T>(pcollection, true, key);
+ }
+
+ private InProcessBundle(PCollection<T> pcollection, boolean keyed, Object key) {
+ this.pcollection = pcollection;
+ this.keyed = keyed;
+ this.key = key;
+ this.elements = ImmutableList.builder();
+ }
+
+ @Override
+ public PCollection<T> getPCollection() {
+ return pcollection;
+ }
+
+ @Override
+ public InProcessBundle<T> add(WindowedValue<T> element) {
+ checkState(
+ !committed,
+ "Can't add element %s to committed bundle in PCollection %s",
+ element,
+ pcollection);
+ elements.add(element);
+ return this;
+ }
+
+ @Override
+ public CommittedBundle<T> commit(final Instant synchronizedCompletionTime) {
+ checkState(!committed, "Can't commit already committed bundle %s", this);
+ committed = true;
+ final Iterable<WindowedValue<T>> committedElements = elements.build();
+ return new CommittedBundle<T>() {
+ @Override
+ @Nullable
+ public Object getKey() {
+ return key;
+ }
+
+ @Override
+ public boolean isKeyed() {
+ return keyed;
+ }
+
+ @Override
+ public Iterable<WindowedValue<T>> getElements() {
+ return committedElements;
+ }
+
+ @Override
+ public PCollection<T> getPCollection() {
+ return pcollection;
+ }
+
+ @Override
+ public Instant getSynchronizedProcessingOutputWatermark() {
+ return synchronizedCompletionTime;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .omitNullValues()
+ .add("pcollection", pcollection)
+ .add("key", key)
+ .add("elements", committedElements)
+ .toString();
+ }
+ };
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
index dcbbf40..078827d 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
@@ -77,6 +77,7 @@ class InProcessEvaluationContext {
/** The options that were used to create this {@link Pipeline}. */
private final InProcessPipelineOptions options;
+ private final BundleFactory bundleFactory;
/** The current processing time and event time watermarks and timers. */
private final InMemoryWatermarkManager watermarkManager;
@@ -93,21 +94,24 @@ class InProcessEvaluationContext {
public static InProcessEvaluationContext create(
InProcessPipelineOptions options,
+ BundleFactory bundleFactory,
Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
Map<AppliedPTransform<?, ?, ?>, String> stepNames,
Collection<PCollectionView<?>> views) {
return new InProcessEvaluationContext(
- options, rootTransforms, valueToConsumers, stepNames, views);
+ options, bundleFactory, rootTransforms, valueToConsumers, stepNames, views);
}
private InProcessEvaluationContext(
InProcessPipelineOptions options,
+ BundleFactory bundleFactory,
Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
Map<AppliedPTransform<?, ?, ?>, String> stepNames,
Collection<PCollectionView<?>> views) {
this.options = checkNotNull(options);
+ this.bundleFactory = checkNotNull(bundleFactory);
checkNotNull(rootTransforms);
checkNotNull(valueToConsumers);
checkNotNull(stepNames);
@@ -207,7 +211,7 @@ class InProcessEvaluationContext {
* Create a {@link UncommittedBundle} for use by a source.
*/
public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
- return InProcessBundle.unkeyed(output);
+ return bundleFactory.createRootBundle(output);
}
/**
@@ -215,9 +219,7 @@ class InProcessEvaluationContext {
* PCollection}.
*/
public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
- return input.isKeyed()
- ? InProcessBundle.keyed(output, input.getKey())
- : InProcessBundle.unkeyed(output);
+ return bundleFactory.createBundle(input, output);
}
/**
@@ -226,7 +228,7 @@ class InProcessEvaluationContext {
*/
public <T> UncommittedBundle<T> createKeyedBundle(
CommittedBundle<?> input, Object key, PCollection<T> output) {
- return InProcessBundle.keyed(output, key);
+ return bundleFactory.createKeyedBundle(input, key, output);
}
/**
@@ -355,7 +357,9 @@ class InProcessEvaluationContext {
* for each time they are set.
*/
public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> extractFiredTimers() {
- return watermarkManager.extractFiredTimers();
+ Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired =
+ watermarkManager.extractFiredTimers();
+ return fired;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
index 8123711..4fb01b7 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
@@ -232,6 +232,7 @@ public class InProcessPipelineRunner
InProcessEvaluationContext context =
InProcessEvaluationContext.create(
getPipelineOptions(),
+ createBundleFactory(getPipelineOptions()),
consumerTrackingVisitor.getRootTransforms(),
consumerTrackingVisitor.getValueToConsumers(),
consumerTrackingVisitor.getStepNames(),
@@ -271,6 +272,10 @@ public class InProcessPipelineRunner
return Collections.emptyMap();
}
+ private BundleFactory createBundleFactory(InProcessPipelineOptions pipelineOptions) {
+ return InProcessBundleFactory.create();
+ }
+
/**
* The result of running a {@link Pipeline} with the {@link InProcessPipelineRunner}.
*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
index ebece5f..8e92caf 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
@@ -48,6 +47,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import java.io.IOException;
import java.util.Arrays;
@@ -63,20 +63,22 @@ public class BoundedReadEvaluatorFactoryTest {
private PCollection<Long> longs;
private TransformEvaluatorFactory factory;
@Mock private InProcessEvaluationContext context;
+ private BundleFactory bundleFactory;
@Before
public void setup() {
+ MockitoAnnotations.initMocks(this);
source = CountingSource.upTo(10L);
TestPipeline p = TestPipeline.create();
longs = p.apply(Read.from(source));
factory = new BoundedReadEvaluatorFactory();
- context = mock(InProcessEvaluationContext.class);
+ bundleFactory = InProcessBundleFactory.create();
}
@Test
public void boundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception {
- UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
+ UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs);
when(context.createRootBundle(longs)).thenReturn(output);
TransformEvaluator<?> evaluator =
@@ -96,8 +98,7 @@ public class BoundedReadEvaluatorFactoryTest {
*/
@Test
public void boundedSourceInMemoryTransformEvaluatorAfterFinishIsEmpty() throws Exception {
- UncommittedBundle<Long> output =
- InProcessBundle.unkeyed(longs);
+ UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs);
when(context.createRootBundle(longs)).thenReturn(output);
TransformEvaluator<?> evaluator =
@@ -111,7 +112,7 @@ public class BoundedReadEvaluatorFactoryTest {
containsInAnyOrder(
gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
- UncommittedBundle<Long> secondOutput = InProcessBundle.unkeyed(longs);
+ UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs);
when(context.createRootBundle(longs)).thenReturn(secondOutput);
TransformEvaluator<?> secondEvaluator =
factory.forApplication(longs.getProducingTransformInternal(), null, context);
@@ -132,8 +133,8 @@ public class BoundedReadEvaluatorFactoryTest {
*/
@Test
public void boundedSourceEvaluatorSimultaneousEvaluations() throws Exception {
- UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
- UncommittedBundle<Long> secondOutput = InProcessBundle.unkeyed(longs);
+ UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs);
+ UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs);
when(context.createRootBundle(longs)).thenReturn(output).thenReturn(secondOutput);
// create both evaluators before finishing either.
@@ -171,7 +172,7 @@ public class BoundedReadEvaluatorFactoryTest {
PCollection<Long> pcollection = p.apply(Read.from(source));
AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
- UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
+ UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
when(context.createRootBundle(pcollection)).thenReturn(output);
TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
@@ -189,7 +190,7 @@ public class BoundedReadEvaluatorFactoryTest {
PCollection<Long> pcollection = p.apply(Read.from(source));
AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
- UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
+ UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
when(context.createRootBundle(pcollection)).thenReturn(output);
TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
index 39cc54a..f93abd8 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
@@ -45,6 +45,7 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class FlattenEvaluatorFactoryTest {
+ private BundleFactory bundleFactory = InProcessBundleFactory.create();
@Test
public void testFlattenInMemoryEvaluator() throws Exception {
TestPipeline p = TestPipeline.create();
@@ -54,13 +55,15 @@ public class FlattenEvaluatorFactoryTest {
PCollection<Integer> flattened = list.apply(Flatten.<Integer>pCollections());
- CommittedBundle<Integer> leftBundle = InProcessBundle.unkeyed(left).commit(Instant.now());
- CommittedBundle<Integer> rightBundle = InProcessBundle.unkeyed(right).commit(Instant.now());
+ CommittedBundle<Integer> leftBundle =
+ bundleFactory.createRootBundle(left).commit(Instant.now());
+ CommittedBundle<Integer> rightBundle =
+ bundleFactory.createRootBundle(right).commit(Instant.now());
InProcessEvaluationContext context = mock(InProcessEvaluationContext.class);
- UncommittedBundle<Integer> flattenedLeftBundle = InProcessBundle.unkeyed(flattened);
- UncommittedBundle<Integer> flattenedRightBundle = InProcessBundle.unkeyed(flattened);
+ UncommittedBundle<Integer> flattenedLeftBundle = bundleFactory.createRootBundle(flattened);
+ UncommittedBundle<Integer> flattenedRightBundle = bundleFactory.createRootBundle(flattened);
when(context.createBundle(leftBundle, flattened)).thenReturn(flattenedLeftBundle);
when(context.createBundle(rightBundle, flattened)).thenReturn(flattenedRightBundle);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
index 9c0957d..e80125f 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
@@ -50,6 +50,8 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class GroupByKeyEvaluatorFactoryTest {
+ private BundleFactory bundleFactory = InProcessBundleFactory.create();
+
@Test
public void testInMemoryEvaluator() throws Exception {
TestPipeline p = TestPipeline.create();
@@ -67,15 +69,15 @@ public class GroupByKeyEvaluatorFactoryTest {
kvs.apply(new GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly<String, Integer>());
CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle =
- InProcessBundle.unkeyed(kvs).commit(Instant.now());
+ bundleFactory.createRootBundle(kvs).commit(Instant.now());
InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle =
- InProcessBundle.keyed(groupedKvs, "foo");
+ bundleFactory.createKeyedBundle(null, "foo", groupedKvs);
UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle =
- InProcessBundle.keyed(groupedKvs, "bar");
+ bundleFactory.createKeyedBundle(null, "bar", groupedKvs);
UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle =
- InProcessBundle.keyed(groupedKvs, "baz");
+ bundleFactory.createKeyedBundle(null, "baz", groupedKvs);
when(evaluationContext.createKeyedBundle(inputBundle, "foo", groupedKvs)).thenReturn(fooBundle);
when(evaluationContext.createKeyedBundle(inputBundle, "bar", groupedKvs)).thenReturn(barBundle);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
new file mode 100644
index 0000000..40b1d5a
--- /dev/null
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertThat;
+
+import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ImmutabilityCheckingBundleFactory}.
+ */
+@RunWith(JUnit4.class)
+public class ImmutabilityCheckingBundleFactoryTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+ private ImmutabilityCheckingBundleFactory factory;
+ private PCollection<byte[]> created;
+ private PCollection<byte[]> transformed;
+
+ @Before
+ public void setup() {
+ TestPipeline p = TestPipeline.create();
+ created = p.apply(Create.<byte[]>of().withCoder(ByteArrayCoder.of()));
+ transformed = created.apply(ParDo.of(new IdentityDoFn<byte[]>()));
+ factory = ImmutabilityCheckingBundleFactory.create(InProcessBundleFactory.create());
+ }
+
+ @Test
+ public void noMutationRootBundleSucceeds() {
+ UncommittedBundle<byte[]> root = factory.createRootBundle(created);
+ byte[] array = new byte[] {0, 1, 2};
+ root.add(WindowedValue.valueInGlobalWindow(array));
+ CommittedBundle<byte[]> committed = root.commit(Instant.now());
+
+ assertThat(
+ committed.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(array)));
+ }
+
+ @Test
+ public void noMutationKeyedBundleSucceeds() {
+ CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
+ UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed);
+
+ WindowedValue<byte[]> windowedArray =
+ WindowedValue.of(
+ new byte[] {4, 8, 12},
+ new Instant(891L),
+ new IntervalWindow(new Instant(0), new Instant(1000)),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ keyed.add(windowedArray);
+
+ CommittedBundle<byte[]> committed = keyed.commit(Instant.now());
+ assertThat(committed.getElements(), containsInAnyOrder(windowedArray));
+ }
+
+ @Test
+ public void noMutationCreateBundleSucceeds() {
+ CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
+ UncommittedBundle<byte[]> intermediate = factory.createBundle(root, transformed);
+
+ WindowedValue<byte[]> windowedArray =
+ WindowedValue.of(
+ new byte[] {4, 8, 12},
+ new Instant(891L),
+ new IntervalWindow(new Instant(0), new Instant(1000)),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ intermediate.add(windowedArray);
+
+ CommittedBundle<byte[]> committed = intermediate.commit(Instant.now());
+ assertThat(committed.getElements(), containsInAnyOrder(windowedArray));
+ }
+
+ @Test
+ public void mutationBeforeAddRootBundleSucceeds() {
+ UncommittedBundle<byte[]> root = factory.createRootBundle(created);
+ byte[] array = new byte[] {0, 1, 2};
+ array[1] = 2;
+ root.add(WindowedValue.valueInGlobalWindow(array));
+ CommittedBundle<byte[]> committed = root.commit(Instant.now());
+
+ assertThat(
+ committed.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(array)));
+ }
+
+ @Test
+ public void mutationBeforeAddKeyedBundleSucceeds() {
+ CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
+ UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed);
+
+ byte[] array = new byte[] {4, 8, 12};
+ array[0] = Byte.MAX_VALUE;
+ WindowedValue<byte[]> windowedArray =
+ WindowedValue.of(
+ array,
+ new Instant(891L),
+ new IntervalWindow(new Instant(0), new Instant(1000)),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ keyed.add(windowedArray);
+
+ CommittedBundle<byte[]> committed = keyed.commit(Instant.now());
+ assertThat(committed.getElements(), containsInAnyOrder(windowedArray));
+ }
+
+ @Test
+ public void mutationBeforeAddCreateBundleSucceeds() {
+ CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
+ UncommittedBundle<byte[]> intermediate = factory.createBundle(root, transformed);
+
+ byte[] array = new byte[] {4, 8, 12};
+ WindowedValue<byte[]> windowedArray =
+ WindowedValue.of(
+ array,
+ new Instant(891L),
+ new IntervalWindow(new Instant(0), new Instant(1000)),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ array[2] = -3;
+ intermediate.add(windowedArray);
+
+ CommittedBundle<byte[]> committed = intermediate.commit(Instant.now());
+ assertThat(committed.getElements(), containsInAnyOrder(windowedArray));
+ }
+
+ @Test
+ public void mutationAfterAddRootBundleThrows() {
+ UncommittedBundle<byte[]> root = factory.createRootBundle(created);
+ byte[] array = new byte[] {0, 1, 2};
+ root.add(WindowedValue.valueInGlobalWindow(array));
+
+ array[1] = 2;
+ thrown.expect(UserCodeException.class);
+ thrown.expectCause(isA(IllegalMutationException.class));
+ thrown.expectMessage("Values must not be mutated in any way after being output");
+ CommittedBundle<byte[]> committed = root.commit(Instant.now());
+ }
+
+ @Test
+ public void mutationAfterAddKeyedBundleThrows() {
+ CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
+ UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed);
+
+ byte[] array = new byte[] {4, 8, 12};
+ WindowedValue<byte[]> windowedArray =
+ WindowedValue.of(
+ array,
+ new Instant(891L),
+ new IntervalWindow(new Instant(0), new Instant(1000)),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ keyed.add(windowedArray);
+
+ array[0] = Byte.MAX_VALUE;
+ thrown.expect(UserCodeException.class);
+ thrown.expectCause(isA(IllegalMutationException.class));
+ thrown.expectMessage("Values must not be mutated in any way after being output");
+ CommittedBundle<byte[]> committed = keyed.commit(Instant.now());
+ }
+
+ @Test
+ public void mutationAfterAddCreateBundleThrows() {
+ CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
+ UncommittedBundle<byte[]> intermediate = factory.createBundle(root, transformed);
+
+ byte[] array = new byte[] {4, 8, 12};
+ WindowedValue<byte[]> windowedArray =
+ WindowedValue.of(
+ array,
+ new Instant(891L),
+ new IntervalWindow(new Instant(0), new Instant(1000)),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ intermediate.add(windowedArray);
+
+ array[2] = -3;
+ thrown.expect(UserCodeException.class);
+ thrown.expectCause(isA(IllegalMutationException.class));
+ thrown.expectMessage("Values must not be mutated in any way after being output");
+ CommittedBundle<byte[]> committed = intermediate.commit(Instant.now());
+ }
+
+ private static class IdentityDoFn<T> extends DoFn<T, T> {
+ @Override
+ public void processElement(DoFn<T, T>.ProcessContext c) throws Exception {
+ c.output(c.element());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
index 140ac05..93d2a42 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
@@ -84,6 +84,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
private transient PCollection<Integer> flattened;
private transient InMemoryWatermarkManager manager;
+ private transient BundleFactory bundleFactory;
@Before
public void setup() {
@@ -131,6 +132,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
clock = MockClock.fromInstant(new Instant(1000));
manager = InMemoryWatermarkManager.create(clock, rootTransforms, consumers);
+ bundleFactory = InProcessBundleFactory.create();
}
/**
@@ -246,7 +248,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
assertThat(withBufferedElements.getOutputWatermark(), equalTo(firstCollectionTimestamp));
CommittedBundle<?> completedFlattenBundle =
- InProcessBundle.unkeyed(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ bundleFactory.createRootBundle(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.updateWatermarks(firstPcollectionBundle, flattened.getProducingTransformInternal(),
TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(completedFlattenBundle),
null);
@@ -342,13 +344,13 @@ public class InMemoryWatermarkManagerTest implements Serializable {
@Test
public void updateWatermarkWithKeyedWatermarkHolds() {
CommittedBundle<Integer> firstKeyBundle =
- InProcessBundle.keyed(createdInts, "Odd")
+ bundleFactory.createKeyedBundle(null, "Odd", createdInts)
.add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(1_000_000L)))
.add(WindowedValue.timestampedValueInGlobalWindow(3, new Instant(-1000L)))
.commit(clock.now());
CommittedBundle<Integer> secondKeyBundle =
- InProcessBundle.keyed(createdInts, "Even")
+ bundleFactory.createKeyedBundle(null, "Even", createdInts)
.add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L)))
.commit(clock.now());
@@ -368,7 +370,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new Instant(-1000L))));
CommittedBundle<Integer> fauxFirstKeyTimerBundle =
- InProcessBundle.keyed(createdInts, "Odd").commit(clock.now());
+ bundleFactory.createKeyedBundle(null, "Odd", createdInts).commit(clock.now());
manager.updateWatermarks(fauxFirstKeyTimerBundle, filtered.getProducingTransformInternal(),
TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -376,7 +378,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(1234L)));
CommittedBundle<Integer> fauxSecondKeyTimerBundle =
- InProcessBundle.keyed(createdInts, "Even").commit(clock.now());
+ bundleFactory.createKeyedBundle(null, "Even", createdInts).commit(clock.now());
manager.updateWatermarks(fauxSecondKeyTimerBundle, filtered.getProducingTransformInternal(),
TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), new Instant(5678L));
assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(5678L)));
@@ -396,7 +398,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
@Test
public void updateOutputWatermarkShouldBeMonotonic() {
CommittedBundle<?> firstInput =
- InProcessBundle.unkeyed(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
Collections.<CommittedBundle<?>>singleton(firstInput), new Instant(0L));
TransformWatermarks firstWatermarks =
@@ -404,7 +406,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L)));
CommittedBundle<?> secondInput =
- InProcessBundle.unkeyed(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
Collections.<CommittedBundle<?>>singleton(secondInput), new Instant(-250L));
TransformWatermarks secondWatermarks =
@@ -579,7 +581,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
CommittedBundle<Integer> createOutput =
- InProcessBundle.unkeyed(createdInts).commit(new Instant(1250L));
+ bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L));
manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
Collections.<CommittedBundle<?>>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -606,7 +608,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
not(laterThan(new Instant(1250L))));
CommittedBundle<?> filterOutputBundle =
- InProcessBundle.unkeyed(intsToFlatten).commit(new Instant(1250L));
+ bundleFactory.createRootBundle(intsToFlatten).commit(new Instant(1250L));
manager.updateWatermarks(createOutput, filtered.getProducingTransformInternal(),
TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(filterOutputBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -673,9 +675,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), not(laterThan(startTime)));
CommittedBundle<Integer> filteredTimerBundle =
- InProcessBundle.keyed(filtered, "key").commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ bundleFactory
+ .createKeyedBundle(null, "key", filtered)
+ .commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
CommittedBundle<Integer> filteredTimerResult =
- InProcessBundle.keyed(filteredTimesTwo, "key")
+ bundleFactory.createKeyedBundle(null, "key", filteredTimesTwo)
.commit(filteredWms.getSynchronizedProcessingOutputTime());
// Complete the processing time timer
manager.updateWatermarks(filteredTimerBundle, filtered.getProducingTransformInternal(),
@@ -725,7 +729,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
CommittedBundle<Integer> createOutput =
- InProcessBundle.unkeyed(createdInts).commit(new Instant(1250L));
+ bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L));
manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
Collections.<CommittedBundle<?>>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -736,7 +740,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
createAfterUpdate.getSynchronizedProcessingOutputTime(), not(laterThan(clock.now())));
CommittedBundle<Integer> createSecondOutput =
- InProcessBundle.unkeyed(createdInts).commit(new Instant(750L));
+ bundleFactory.createRootBundle(createdInts).commit(new Instant(750L));
manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
Collections.<CommittedBundle<?>>singleton(createSecondOutput),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -784,13 +788,20 @@ public class InMemoryWatermarkManagerTest implements Serializable {
@Test
public void synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() {
CommittedBundle<Integer> created = globallyWindowedBundle(createdInts, 1, 2, 3);
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(created), new Instant(29_919_235L));
+ manager.updateWatermarks(
+ null,
+ createdInts.getProducingTransformInternal(),
+ TimerUpdate.empty(),
+ Collections.<CommittedBundle<?>>singleton(created),
+ new Instant(29_919_235L));
Instant upstreamHold = new Instant(2048L);
CommittedBundle<Integer> filteredBundle =
- InProcessBundle.keyed(filtered, "key").commit(upstreamHold);
- manager.updateWatermarks(created, filtered.getProducingTransformInternal(), TimerUpdate.empty(),
+ bundleFactory.createKeyedBundle(null, "key", filtered).commit(upstreamHold);
+ manager.updateWatermarks(
+ created,
+ filtered.getProducingTransformInternal(),
+ TimerUpdate.empty(),
Collections.<CommittedBundle<?>>singleton(filteredBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1094,7 +1105,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
@SafeVarargs
private final <T> CommittedBundle<T> timestampedBundle(
PCollection<T> pc, TimestampedValue<T>... values) {
- UncommittedBundle<T> bundle = InProcessBundle.unkeyed(pc);
+ UncommittedBundle<T> bundle = bundleFactory.createRootBundle(pc);
for (TimestampedValue<T> value : values) {
bundle.add(
WindowedValue.timestampedValueInGlobalWindow(value.getValue(), value.getTimestamp()));
@@ -1104,7 +1115,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
@SafeVarargs
private final <T> CommittedBundle<T> globallyWindowedBundle(PCollection<T> pc, T... values) {
- UncommittedBundle<T> bundle = InProcessBundle.unkeyed(pc);
+ UncommittedBundle<T> bundle = bundleFactory.createRootBundle(pc);
for (T value : values) {
bundle.add(WindowedValue.valueInGlobalWindow(value));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java
new file mode 100644
index 0000000..060d43c
--- /dev/null
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.WithKeys;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.collect.ImmutableList;
+
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Tests for {@link InProcessBundleFactory}.
+ */
+@RunWith(JUnit4.class)
+public class InProcessBundleFactoryTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ private InProcessBundleFactory bundleFactory = InProcessBundleFactory.create();
+
+ private PCollection<Integer> created;
+ private PCollection<KV<String, Integer>> downstream;
+
+ @Before
+ public void setup() {
+ TestPipeline p = TestPipeline.create();
+ created = p.apply(Create.of(1, 2, 3));
+ downstream = created.apply(WithKeys.<String, Integer>of("foo"));
+ }
+
+ @Test
+ public void createRootBundleShouldCreateWithNullKey() {
+ PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1));
+
+ UncommittedBundle<Integer> inFlightBundle = bundleFactory.createRootBundle(pcollection);
+
+ CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now());
+
+ assertThat(bundle.isKeyed(), is(false));
+ assertThat(bundle.getKey(), nullValue());
+ }
+
+ private void createKeyedBundle(Object key) {
+ PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1));
+
+ UncommittedBundle<Integer> inFlightBundle =
+ bundleFactory.createKeyedBundle(null, key, pcollection);
+
+ CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now());
+ assertThat(bundle.isKeyed(), is(true));
+ assertThat(bundle.getKey(), equalTo(key));
+ }
+
+ @Test
+ public void keyedWithNullKeyShouldCreateKeyedBundle() {
+ createKeyedBundle(null);
+ }
+
+ @Test
+ public void keyedWithKeyShouldCreateKeyedBundle() {
+ createKeyedBundle(new Object());
+ }
+
+ private <T> void afterCommitGetElementsShouldHaveAddedElements(Iterable<WindowedValue<T>> elems) {
+ PCollection<T> pcollection = TestPipeline.create().apply(Create.<T>of());
+
+ UncommittedBundle<T> bundle = bundleFactory.createRootBundle(pcollection);
+ Collection<Matcher<? super WindowedValue<T>>> expectations = new ArrayList<>();
+ for (WindowedValue<T> elem : elems) {
+ bundle.add(elem);
+ expectations.add(equalTo(elem));
+ }
+ Matcher<Iterable<? extends WindowedValue<T>>> containsMatcher =
+ Matchers.<WindowedValue<T>>containsInAnyOrder(expectations);
+ assertThat(bundle.commit(Instant.now()).getElements(), containsMatcher);
+ }
+
+ @Test
+ public void getElementsBeforeAddShouldReturnEmptyIterable() {
+ afterCommitGetElementsShouldHaveAddedElements(Collections.<WindowedValue<Integer>>emptyList());
+ }
+
+ @Test
+ public void getElementsAfterAddShouldReturnAddedElements() {
+ WindowedValue<Integer> firstValue = WindowedValue.valueInGlobalWindow(1);
+ WindowedValue<Integer> secondValue =
+ WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1000L));
+
+ afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(firstValue, secondValue));
+ }
+
+ @Test
+ public void addAfterCommitShouldThrowException() {
+ PCollection<Integer> pcollection = TestPipeline.create().apply(Create.<Integer>of());
+
+ UncommittedBundle<Integer> bundle = bundleFactory.createRootBundle(pcollection);
+ bundle.add(WindowedValue.valueInGlobalWindow(1));
+ CommittedBundle<Integer> firstCommit = bundle.commit(Instant.now());
+ assertThat(firstCommit.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(1)));
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("3");
+ thrown.expectMessage("committed");
+
+ bundle.add(WindowedValue.valueInGlobalWindow(3));
+ }
+
+ @Test
+ public void commitAfterCommitShouldThrowException() {
+ PCollection<Integer> pcollection = TestPipeline.create().apply(Create.<Integer>of());
+
+ UncommittedBundle<Integer> bundle = bundleFactory.createRootBundle(pcollection);
+ bundle.add(WindowedValue.valueInGlobalWindow(1));
+ CommittedBundle<Integer> firstCommit = bundle.commit(Instant.now());
+ assertThat(firstCommit.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(1)));
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("committed");
+
+ bundle.commit(Instant.now());
+ }
+
+ @Test
+ public void createBundleUnkeyedResultUnkeyed() {
+ CommittedBundle<KV<String, Integer>> newBundle =
+ bundleFactory
+ .createBundle(bundleFactory.createRootBundle(created).commit(Instant.now()), downstream)
+ .commit(Instant.now());
+ assertThat(newBundle.isKeyed(), is(false));
+ }
+
+ @Test
+ public void createBundleKeyedResultPropagatesKey() {
+ CommittedBundle<KV<String, Integer>> newBundle =
+ bundleFactory
+ .createBundle(
+ bundleFactory.createKeyedBundle(null, "foo", created).commit(Instant.now()),
+ downstream)
+ .commit(Instant.now());
+ assertThat(newBundle.isKeyed(), is(true));
+ assertThat(newBundle.getKey(), Matchers.<Object>equalTo("foo"));
+ }
+
+ @Test
+ public void createRootBundleUnkeyed() {
+ assertThat(bundleFactory.createRootBundle(created).commit(Instant.now()).isKeyed(), is(false));
+ }
+
+ @Test
+ public void createKeyedBundleKeyed() {
+ CommittedBundle<KV<String, Integer>> keyedBundle =
+ bundleFactory
+ .createKeyedBundle(
+ bundleFactory.createRootBundle(created).commit(Instant.now()), "foo", downstream)
+ .commit(Instant.now());
+ assertThat(keyedBundle.isKeyed(), is(true));
+ assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
index 564f3f2..fde2cb4 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
@@ -124,6 +124,7 @@ public class InProcessEvaluationContextTest {
Collection<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(view);
context = InProcessEvaluationContext.create(
runner.getPipelineOptions(),
+ InProcessBundleFactory.create(),
rootTransforms,
valueToConsumers,
stepNames,
@@ -170,7 +171,9 @@ public class InProcessEvaluationContextTest {
stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
context.handleResult(
- InProcessBundle.keyed(created, "foo").commit(Instant.now()),
+ InProcessBundleFactory.create()
+ .createKeyedBundle(null, "foo", created)
+ .commit(Instant.now()),
ImmutableList.<TimerData>of(),
StepTransformResult.withoutHold(created.getProducingTransformInternal())
.withState(stepContext.commitState())
@@ -262,7 +265,7 @@ public class InProcessEvaluationContextTest {
.withCounters(againCounters)
.build();
context.handleResult(
- InProcessBundle.unkeyed(created).commit(Instant.now()),
+ context.createRootBundle(created).commit(Instant.now()),
ImmutableList.<TimerData>of(),
secondResult);
assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(12L));
@@ -289,7 +292,7 @@ public class InProcessEvaluationContextTest {
.build();
context.handleResult(
- InProcessBundle.keyed(created, myKey).commit(Instant.now()),
+ context.createKeyedBundle(null, myKey, created).commit(Instant.now()),
ImmutableList.<TimerData>of(),
stateResult);
@@ -371,7 +374,7 @@ public class InProcessEvaluationContextTest {
// haven't added any timers, must be empty
assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
context.handleResult(
- InProcessBundle.keyed(created, key).commit(Instant.now()),
+ context.createKeyedBundle(null, key, created).commit(Instant.now()),
ImmutableList.<TimerData>of(),
timerResult);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
index 664161c..87247ac 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
@@ -69,6 +69,8 @@ import java.io.Serializable;
*/
@RunWith(JUnit4.class)
public class ParDoMultiEvaluatorFactoryTest implements Serializable {
+ private transient BundleFactory bundleFactory = InProcessBundleFactory.create();
+
@Test
public void testParDoMultiInMemoryTransformEvaluator() throws Exception {
TestPipeline p = TestPipeline.create();
@@ -80,26 +82,30 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
final TupleTag<Integer> lengthTag = new TupleTag<>();
BoundMulti<String, KV<String, Integer>> pardo =
- ParDo.of(new DoFn<String, KV<String, Integer>>() {
- @Override
- public void processElement(ProcessContext c) {
- c.output(KV.<String, Integer>of(c.element(), c.element().length()));
- c.sideOutput(elementTag, c.element());
- c.sideOutput(lengthTag, c.element().length());
- }
- }).withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag));
+ ParDo.of(
+ new DoFn<String, KV<String, Integer>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(KV.<String, Integer>of(c.element(), c.element().length()));
+ c.sideOutput(elementTag, c.element());
+ c.sideOutput(lengthTag, c.element().length());
+ }
+ })
+ .withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag));
PCollectionTuple outputTuple = input.apply(pardo);
- CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now());
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(input).commit(Instant.now());
PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
PCollection<String> elementOutput = outputTuple.get(elementTag);
PCollection<Integer> lengthOutput = outputTuple.get(lengthTag);
InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
- UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput);
- UncommittedBundle<String> elementOutputBundle = InProcessBundle.unkeyed(elementOutput);
- UncommittedBundle<Integer> lengthOutputBundle = InProcessBundle.unkeyed(lengthOutput);
+ UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+ bundleFactory.createRootBundle(mainOutput);
+ UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
+ UncommittedBundle<Integer> lengthOutputBundle = bundleFactory.createRootBundle(lengthOutput);
when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
when(evaluationContext.createBundle(inputBundle, elementOutput))
@@ -114,8 +120,9 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
when(evaluationContext.createCounterSet()).thenReturn(counters);
com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator =
- new ParDoMultiEvaluatorFactory().forApplication(
- mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
+ new ParDoMultiEvaluatorFactory()
+ .forApplication(
+ mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
evaluator.processElement(
@@ -163,24 +170,28 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
final TupleTag<Integer> lengthTag = new TupleTag<>();
BoundMulti<String, KV<String, Integer>> pardo =
- ParDo.of(new DoFn<String, KV<String, Integer>>() {
- @Override
- public void processElement(ProcessContext c) {
- c.output(KV.<String, Integer>of(c.element(), c.element().length()));
- c.sideOutput(elementTag, c.element());
- c.sideOutput(lengthTag, c.element().length());
- }
- }).withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
+ ParDo.of(
+ new DoFn<String, KV<String, Integer>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(KV.<String, Integer>of(c.element(), c.element().length()));
+ c.sideOutput(elementTag, c.element());
+ c.sideOutput(lengthTag, c.element().length());
+ }
+ })
+ .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
PCollectionTuple outputTuple = input.apply(pardo);
- CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now());
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(input).commit(Instant.now());
PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
PCollection<String> elementOutput = outputTuple.get(elementTag);
InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
- UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput);
- UncommittedBundle<String> elementOutputBundle = InProcessBundle.unkeyed(elementOutput);
+ UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+ bundleFactory.createRootBundle(mainOutput);
+ UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
when(evaluationContext.createBundle(inputBundle, elementOutput))
@@ -194,8 +205,9 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
when(evaluationContext.createCounterSet()).thenReturn(counters);
com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator =
- new ParDoMultiEvaluatorFactory().forApplication(
- mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
+ new ParDoMultiEvaluatorFactory()
+ .forApplication(
+ mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
evaluator.processElement(
@@ -206,8 +218,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
InProcessTransformResult result = evaluator.finishBundle();
assertThat(
result.getOutputBundles(),
- Matchers.<UncommittedBundle<?>>containsInAnyOrder(
- mainOutputBundle, elementOutputBundle));
+ Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle));
assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
assertThat(result.getCounters(), equalTo(counters));
@@ -261,14 +272,16 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
.withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
PCollectionTuple outputTuple = input.apply(pardo);
- CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now());
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(input).commit(Instant.now());
PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
PCollection<String> elementOutput = outputTuple.get(elementTag);
InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
- UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput);
- UncommittedBundle<String> elementOutputBundle = InProcessBundle.unkeyed(elementOutput);
+ UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+ bundleFactory.createRootBundle(mainOutput);
+ UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
when(evaluationContext.createBundle(inputBundle, elementOutput))
@@ -282,8 +295,9 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
when(evaluationContext.createCounterSet()).thenReturn(counters);
com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator =
- new ParDoMultiEvaluatorFactory().forApplication(
- mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
+ new ParDoMultiEvaluatorFactory()
+ .forApplication(
+ mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
evaluator.processElement(
@@ -368,14 +382,16 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
.withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
PCollectionTuple outputTuple = input.apply(pardo);
- CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now());
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(input).commit(Instant.now());
PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
PCollection<String> elementOutput = outputTuple.get(elementTag);
InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
- UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput);
- UncommittedBundle<String> elementOutputBundle = InProcessBundle.unkeyed(elementOutput);
+ UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+ bundleFactory.createRootBundle(mainOutput);
+ UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
when(evaluationContext.createBundle(inputBundle, elementOutput))
@@ -389,8 +405,9 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
when(evaluationContext.createCounterSet()).thenReturn(counters);
com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator =
- new ParDoMultiEvaluatorFactory().forApplication(
- mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
+ new ParDoMultiEvaluatorFactory()
+ .forApplication(
+ mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
evaluator.processElement(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
index 9943fd7..704eb2a 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
@@ -66,21 +66,27 @@ import java.io.Serializable;
*/
@RunWith(JUnit4.class)
public class ParDoSingleEvaluatorFactoryTest implements Serializable {
+ private transient BundleFactory bundleFactory = InProcessBundleFactory.create();
+
@Test
public void testParDoInMemoryTransformEvaluator() throws Exception {
TestPipeline p = TestPipeline.create();
PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
- PCollection<Integer> collection = input.apply(ParDo.of(new DoFn<String, Integer>() {
- @Override public void processElement(ProcessContext c) {
- c.output(c.element().length());
- }
- }));
- CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now());
+ PCollection<Integer> collection =
+ input.apply(
+ ParDo.of(
+ new DoFn<String, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element().length());
+ }
+ }));
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(input).commit(Instant.now());
InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
- UncommittedBundle<Integer> outputBundle =
- InProcessBundle.unkeyed(collection);
+ UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection);
when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle);
InProcessExecutionContext executionContext =
new InProcessExecutionContext(null, null, null, null);
@@ -90,8 +96,9 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
when(evaluationContext.createCounterSet()).thenReturn(counters);
com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator =
- new ParDoSingleEvaluatorFactory().forApplication(
- collection.getProducingTransformInternal(), inputBundle, evaluationContext);
+ new ParDoSingleEvaluatorFactory()
+ .forApplication(
+ collection.getProducingTransformInternal(), inputBundle, evaluationContext);
evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
evaluator.processElement(
@@ -118,16 +125,20 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {};
- PCollection<Integer> collection = input.apply(ParDo.of(new DoFn<String, Integer>() {
- @Override public void processElement(ProcessContext c) {
- c.sideOutput(sideOutputTag, c.element().length());
- }
- }));
- CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now());
+ PCollection<Integer> collection =
+ input.apply(
+ ParDo.of(
+ new DoFn<String, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.sideOutput(sideOutputTag, c.element().length());
+ }
+ }));
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(input).commit(Instant.now());
InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
- UncommittedBundle<Integer> outputBundle =
- InProcessBundle.unkeyed(collection);
+ UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection);
when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle);
InProcessExecutionContext executionContext =
new InProcessExecutionContext(null, null, null, null);
@@ -137,8 +148,9 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
when(evaluationContext.createCounterSet()).thenReturn(counters);
TransformEvaluator<String> evaluator =
- new ParDoSingleEvaluatorFactory().forApplication(
- collection.getProducingTransformInternal(), inputBundle, evaluationContext);
+ new ParDoSingleEvaluatorFactory()
+ .forApplication(
+ collection.getProducingTransformInternal(), inputBundle, evaluationContext);
evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
evaluator.processElement(
@@ -183,10 +195,12 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
});
PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
- CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now());
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(input).commit(Instant.now());
InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
- UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput);
+ UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+ bundleFactory.createRootBundle(mainOutput);
when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
@@ -246,42 +260,44 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
ParDo.Bound<String, KV<String, Integer>> pardo =
ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- @Override
- public void processElement(ProcessContext c) {
- c.windowingInternals().stateInternals();
- c.windowingInternals()
- .timerInternals()
- .setTimer(
- TimerData.of(
- StateNamespaces.window(
- IntervalWindow.getCoder(),
- new IntervalWindow(
- new Instant(0).plus(Duration.standardMinutes(5)),
- new Instant(1)
- .plus(Duration.standardMinutes(5))
- .plus(Duration.standardHours(1)))),
- new Instant(54541L),
- TimeDomain.EVENT_TIME));
- c.windowingInternals()
- .timerInternals()
- .deleteTimer(
- TimerData.of(
- StateNamespaces.window(
- IntervalWindow.getCoder(),
- new IntervalWindow(
- new Instant(0),
- new Instant(0).plus(Duration.standardHours(1)))),
- new Instant(3400000),
- TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
- }
- });
+ new DoFn<String, KV<String, Integer>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.windowingInternals().stateInternals();
+ c.windowingInternals()
+ .timerInternals()
+ .setTimer(
+ TimerData.of(
+ StateNamespaces.window(
+ IntervalWindow.getCoder(),
+ new IntervalWindow(
+ new Instant(0).plus(Duration.standardMinutes(5)),
+ new Instant(1)
+ .plus(Duration.standardMinutes(5))
+ .plus(Duration.standardHours(1)))),
+ new Instant(54541L),
+ TimeDomain.EVENT_TIME));
+ c.windowingInternals()
+ .timerInternals()
+ .deleteTimer(
+ TimerData.of(
+ StateNamespaces.window(
+ IntervalWindow.getCoder(),
+ new IntervalWindow(
+ new Instant(0),
+ new Instant(0).plus(Duration.standardHours(1)))),
+ new Instant(3400000),
+ TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
+ }
+ });
PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
- CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now());
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(input).commit(Instant.now());
InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
- UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput);
+ UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+ bundleFactory.createRootBundle(mainOutput);
when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
@@ -303,10 +319,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
assertThat(
result.getTimerUpdate(),
equalTo(
- TimerUpdate.builder("myKey")
- .setTimer(addedTimer)
- .deletedTimer(deletedTimer)
- .build()));
+ TimerUpdate.builder("myKey").setTimer(addedTimer).deletedTimer(deletedTimer).build()));
}
}